Skip to content

Reference

shared.caching.cache.AlanCache

AlanCache()

Two-layer caching system with local RAM and shared Redis backends.

This class implements a sophisticated caching mechanism that combines fast local RAM caching with persistent shared Redis caching. It automatically manages both layers and provides manual cache operations, decorators for function caching, and advanced features like async computation and cache warming.

The two-layer approach provides: - Ultra-fast local access for frequently used data - Shared persistence across application instances via Redis - Automatic fallback between layers - Configurable backend selection per use case

Attributes:

Name Type Description
disable_cache

Global flag to disable all caching operations for testing.

shared_cache

Primary Redis cache backend for shared data.

shared_cache_atomic

Redis cache with atomic operations for race prevention.

local_cache

Fast in-memory cache for local data.

local_cache_no_serializer

Local cache without serialization for objects.

Examples:

Basic cache operations:

>>> cache = AlanCache()
>>> cache.set("key", {"data": "value"}, timedelta(minutes=30))
>>> result = cache.get("key")
>>> cache.delete("key")

Check cache layers:

>>> if cache.has("key"):
...     data = cache.get("key")  # Retrieved from fastest available layer
Note
  • Automatically initializes from environment or app configuration
  • Local cache is checked first for performance
  • Redis cache provides persistence and cross-instance sharing
  • Configuration via CACHE_TYPE, LOCALCACHE_TYPE environment variables
  • Thread-safe for concurrent access
Source code in shared/caching/cache.py
def __init__(self) -> None:
    self.shared_cache = Cache()
    self.shared_cache_atomic = Cache()
    self.local_cache = Cache()
    self.local_cache_no_serializer = Cache()
    # used to manage scheduled async cache rebuilds (when run in the scheduled code)
    redis, _is_not_faked = get_redis_caching_connection()
    self.redis = redis

    if not self._try_init_from_env():
        self._init_with_defaults()

clear_all_cache

clear_all_cache()

DANGER - Deletes all the cache

Source code in shared/caching/cache.py
def clear_all_cache(self) -> None:
    """
    DANGER - Deletes all the cache
    """
    if is_production_mode():
        raise Exception("clear_all_cache() is not allowed in production mode")
    self.local_cache.clear()
    self.local_cache_no_serializer.clear()
    self.shared_cache.clear()
    self.shared_cache_atomic.clear()

clear_cached_func

clear_cached_func(func, *args, **kwargs)

Deletes the specified functions cached values, based on given parameters

Source code in shared/caching/cache.py
def clear_cached_func(self, func, *args, **kwargs) -> bool:  # type: ignore[no-untyped-def]
    """
    Deletes the specified functions cached values, based on given parameters
    """
    cache_key = func.make_cache_key(*args, **kwargs)
    return self.delete(cache_key)

clear_cached_func_all

clear_cached_func_all(func)

Deletes all the caches of the specified function (ignoring the parameters)

Source code in shared/caching/cache.py
def clear_cached_func_all(self, func: Callable[..., Any]) -> None:
    """
    Deletes all the caches of the specified function (ignoring the parameters)
    """

    funcname = _funcname(func)
    self.delete_from_funcname(
        funcname,
        local_ram_cache_only=func.local_ram_cache_only,  # type: ignore[attr-defined]
    )

clear_cached_func_some

clear_cached_func_some(func, *args, **kwargs)

Deletes asynchronously some of the the specified function cached values, based on some of the given parameters, in local cache and shared (Redis) cache

Returns a tuple saying how many keys have been deleted so far on Redis, and if the process is finished or not.

WARNING 0: this function will only work if you used cache_key_with_full_args option in the cache decorator that you applied to the function

WARNING 1: this function deletes the keys asynchronously, if you need to wait until all cache keys are deleted you should interpret its return values and call it again.

WARNING 2: it doesn't support signatures with keyword-only arguments or positional-only arguments. Also it doesn't support omitting arguments in args or *wkargs

In the following signature, only the params b and c will be recognized, so clear_cached_func_some won't work properly:

def func(a, /, b, c, args, , d, **kwargs)

Source code in shared/caching/cache.py
def clear_cached_func_some(self, func, *args, **kwargs) -> tuple[int, bool]:  # type: ignore[no-untyped-def]
    """Deletes asynchronously some of the the specified function cached
    values, based on some of the given parameters, in local cache and shared
    (Redis) cache

    Returns a tuple saying how many keys have been deleted so far on Redis,
    and if the process is finished or not.

    WARNING 0: this function will only work if you used
    `cache_key_with_full_args` option in the cache decorator that you
    applied to the function

    WARNING 1: this function deletes the keys asynchronously, if you need to
    wait until all cache keys are deleted you should interpret its return
    values and call it again.

    WARNING 2: it doesn't support signatures with *keyword-only* arguments
    or *positional-only* arguments.  Also it doesn't support omitting
    arguments in *args or **wkargs

    In the following signature, only the params b and c will be recognized,
    so clear_cached_func_some won't work properly:

    def func(a, /, b, c, *args, *, d, **kwargs)
    """

    # make sure the function was decorated with the `cache_key_with_full_args`
    if not getattr(func, "cache_key_with_full_args", False):
        raise ValueError(
            f"To be able to call clear_cached_func_some on the function {_funcname(func)}, it must be decorated with the option `cache_key_with_full_args`"
        )

    # introspect func to know which args were not passed
    signature_arg_names = get_arg_names(func)
    signature_args_len = len(signature_arg_names)

    args_len = len(args)
    omitted_args_ids: list[int] = []
    for i in range(signature_args_len):
        if i < args_len:
            pass  # argument passed via *args
        elif signature_arg_names[i] in kwargs:
            pass  # argument passed via **kwargs
        else:
            omitted_args_ids.append(i)

    cache_key_pattern = cast(
        "str",
        func.make_cache_key(*args, **kwargs, __omitted_args_ids__=omitted_args_ids),
    )

    # Extract funcname and the pattern for use with CACHED_FUNC_KEYS sets
    funcname = _funcname(func)
    funcname_prefix = f"{funcname}-"

    if not cache_key_pattern.startswith(funcname_prefix):
        raise ValueError(
            f"Cache key pattern '{cache_key_pattern}' does not start with expected funcname prefix '{funcname_prefix}'"
        )

    # Remove the funcname prefix to get the funcname filter pattern
    funcname_filter = cache_key_pattern[len(funcname_prefix) :]

    return _delete_shared_then_local_cache_patterns_async(
        [(funcname, funcname_filter)]
    )

default_timeout property

default_timeout

delete

delete(key)

Delete key from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required

Returns:

Type Description
bool

Whether the key existed (in at least one cache layer) and has been deleted.

Source code in shared/caching/cache.py
def delete(self, key: str) -> bool:
    """
    Delete key from the cache.

    Args:
        key: The key to delete.

    Returns:
        Whether the key existed (in at least one cache layer) and has been deleted.
    """
    res = self.local_cache.delete(key)
    res2 = self.local_cache_no_serializer.delete(key)
    res3 = self.shared_cache.delete(key)
    res4 = self.shared_cache_atomic.delete(key)
    return res or res2 or res3 or res4

delete_from_funcname

delete_from_funcname(funcname, local_ram_cache_only=False)

Deletes all the caches keys (local and Redis) for the given funcname.

Source code in shared/caching/cache.py
def delete_from_funcname(
    self, funcname: str, local_ram_cache_only: bool = False
) -> None:
    """
    Deletes all the caches keys (local and Redis) for the given funcname.
    """
    # delete local cache keys immediately
    deleted_local_keys = _delete_local_cache_keys_from_patterns([f"{funcname}-.*"])

    if local_ram_cache_only:
        return

    # immediately delete in the shared cache the keys that we deleted in the local cache
    for key in deleted_local_keys:
        self.shared_cache.delete(key)

    # Maybe more related keys are still in the shared cache, so we need to
    # delete them using the pattern. But it may take a while, so we delete
    # the shared cache async, it'll also trigger the local cache of all
    # other workers to be deleted, maximum 5 minutes after the shared cache

    _delete_shared_then_local_cache_patterns_async([(funcname, "*")])

delete_many

delete_many(*keys)

Deletes multiple keys at once.

Parameters:

Name Type Description Default
keys str

The function accepts multiple keys as positional arguments.

()

Returns:

Type Description
list[str]

A list containing sucessfully deleted unique keys from any of the cache layers

Source code in shared/caching/cache.py
def delete_many(self, *keys: str) -> list[str]:
    """
    Deletes multiple keys at once.

    Args:
        keys: The function accepts multiple keys as positional arguments.

    Returns:
        A list containing sucessfully deleted unique keys from any of the cache layers
    """
    # we need to reimplement it from the ground up since cachelib
    # doesn't provide a default implementation.
    deleted_keys: set[str] = set()
    for key in keys:
        if self.local_cache.delete(key):
            deleted_keys.add(key)
        if self.local_cache_no_serializer.delete(key):
            deleted_keys.add(key)
        if self.shared_cache.delete(key):
            deleted_keys.add(key)
        if self.shared_cache_atomic.delete(key):
            deleted_keys.add(key)

    return list(deleted_keys)

disable_cache class-attribute instance-attribute

disable_cache = False

get

get(key)

Look up key in the cache and return the value for it.

Parameters:

Name Type Description Default
key str

The key to be looked up.

required

Returns:

Type Description
Any

The value if it exists and is readable, else None.

Source code in shared/caching/cache.py
def get(self, key: str) -> Any:
    """
    Look up key in the cache and return the value for it.

    Args:
        key: The key to be looked up.

    Returns:
        The value if it exists and is readable, else None.
    """
    if self.local_cache.has(key):
        return self.local_cache.get(key)
    elif self.local_cache_no_serializer.has(key):
        return self.local_cache_no_serializer.get(key)
    elif self.shared_cache.has(key):
        return self.shared_cache.get(key)

    return self.shared_cache_atomic.get(key)

get_many

get_many(*keys)

Returns a list of values for the given keys.

For each key an item in the list is created::

foo, bar = cache.get_many("foo", "bar")

Has the same error handling as :meth:get.

Parameters:

Name Type Description Default
keys str

The function accepts multiple keys as positional arguments.

()
Source code in shared/caching/cache.py
def get_many(self, *keys: str) -> list:  # type: ignore[type-arg]
    """
    Returns a list of values for the given keys.

    For each key an item in the list is created::

        foo, bar = cache.get_many("foo", "bar")

    Has the same error handling as :meth:`get`.

    Args:
        keys: The function accepts multiple keys as positional arguments.
    """
    return [self.get(key) for key in keys]

has

has(key)

Checks if a key exists in any of the cache layers, without returning it.

This is a cheap operation that bypasses loading the actual data on the backend.

Parameters:

Name Type Description Default
key str

The key to check

required
Source code in shared/caching/cache.py
def has(self, key: str) -> bool:
    """
    Checks if a key exists in any of the cache layers, without returning it.

    This is a cheap operation that bypasses loading the actual data on the backend.

    Args:
        key: The key to check
    """

    return (
        self.local_cache.has(key)
        or self.local_cache_no_serializer.has(key)
        or self.shared_cache.has(key)
        or self.shared_cache_atomic.has(key)
    )

init_from_config_obj

init_from_config_obj(config_obj, config)
Source code in shared/caching/cache.py
def init_from_config_obj(
    self, config_obj: DefaultConfig, config: dict[str, Any]
) -> None:
    merged_config = _config_to_cache_config_dict(config_obj) | config
    self._init_from_config(merged_config)

local_cache instance-attribute

local_cache = Cache()

local_cache_no_serializer instance-attribute

local_cache_no_serializer = Cache()

redis instance-attribute

redis = redis

set

set(key, value, expiration)

Set a new key/value to the cache (overwrites value, if key already exists in the cache).

Parameters:

Name Type Description Default
key str

The key to set

required
value Any

The value for the key

required
expiration timedelta

The cache expiration for the key (if not specified, it uses the default timeout). An expiration of 0 indicates that the cache never expires.

required
Source code in shared/caching/cache.py
def set(self, key: str, value: Any, expiration: timedelta) -> None:
    """
    Set a new key/value to the cache (overwrites value, if key already exists in the cache).

    Args:
        key: The key to set
        value: The value for the key
        expiration: The cache expiration for the key (if not specified, it uses the default timeout).
            An expiration of 0 indicates that the cache never expires.
    """
    self.shared_cache.set(key, value, to_seconds(expiration))
    self.local_cache.set(key, value, to_seconds(expiration))

shared_cache instance-attribute

shared_cache = Cache()

shared_cache_atomic instance-attribute

shared_cache_atomic = Cache()

shared.caching.cache.CACHED_FUNCS module-attribute

CACHED_FUNCS = 'cached_funcs'

shared.caching.cache.CACHED_FUNCS_TO_DELETE module-attribute

CACHED_FUNCS_TO_DELETE = 'cached_funcs_to_delete'

shared.caching.cache.CACHED_FUNCS_TO_REFRESH module-attribute

CACHED_FUNCS_TO_REFRESH = 'cached_funcs_to_refresh'

shared.caching.cache.CACHED_FUNCS_TO_REFRESH_LAST_RUN_FINISHED module-attribute

CACHED_FUNCS_TO_REFRESH_LAST_RUN_FINISHED = (
    "cached_funcs_to_refresh_last_run_finished"
)

shared.caching.cache.CACHED_FUNC_KEYS_SET_ALPHA_SUFFIX module-attribute

CACHED_FUNC_KEYS_SET_ALPHA_SUFFIX = '_alpha'

shared.caching.cache.CACHED_FUNC_KEYS_SET_DEFAULT_NAME module-attribute

CACHED_FUNC_KEYS_SET_DEFAULT_NAME = 'default'

shared.caching.cache.CACHED_FUNC_KEYS_SET_PREFIX module-attribute

CACHED_FUNC_KEYS_SET_PREFIX = 'cached_func_keys_'

shared.caching.cache.CACHED_FUNC_KEYS_SET_SIZE_SUFFIX module-attribute

CACHED_FUNC_KEYS_SET_SIZE_SUFFIX = '_size'

shared.caching.cache.DEFAULT_WARMUP_STARTUP_TIMEOUT module-attribute

DEFAULT_WARMUP_STARTUP_TIMEOUT = timedelta(seconds=10)

shared.caching.cache.DELETION_CHECK_FREQUENCY_SECS module-attribute

DELETION_CHECK_FREQUENCY_SECS = 60 * 5

shared.caching.cache.FLASK_CACHING_KEY_PREFIX module-attribute

FLASK_CACHING_KEY_PREFIX = 'flask_cache_'

shared.caching.cache.FuncnameFilterPairs module-attribute

FuncnameFilterPairs = list[tuple[str, str]]

shared.caching.cache.REFRESH_EVERY_GRANULARITY module-attribute

REFRESH_EVERY_GRANULARITY = timedelta(minutes=5)

shared.caching.cache.alan_cache module-attribute

alan_cache = AlanCache()

shared.caching.cache.cached

cached(
    *,
    unless=None,
    expire_in=None,
    expire_when=None,
    local_ram_cache_only=False,
    shared_redis_cache_only=False,
    cache_key_prefix="",
    cache_key_with_request_path=False,
    cache_key_with_query_string=False,
    cache_key_with_func_args=True,
    cache_key_with_full_args=False,
    ignore_self=False,
    ignore_cls=False,
    args_to_ignore=None,
    cache_none_values=False,
    async_compute=False,
    async_refresh_every=None,
    warmup_on_startup=False,
    on_cache_computed=None,
    async_compute_job_timeout=None,
    warmup_timeout=DEFAULT_WARMUP_STARTUP_TIMEOUT,
    atomic_writes=False,
    no_serialization=False
)

Decorator. Use this to cache the return value of a function.

See @cached_for for all the details.

Source code in shared/caching/cache.py
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
def cached(
    *,
    unless: Callable[..., bool] | None = None,
    expire_in: timedelta | None = None,  # relative timeout
    expire_when: Literal["object_is_destroyed"]
    | None = None,  # expire based on conditions
    local_ram_cache_only: bool = False,  # only use local instance RAM cache (not Redis)
    shared_redis_cache_only: bool = False,  # only use shared Redis cache (not local RAM)
    cache_key_prefix: str
    | Callable[..., str] = "",  # specify the base key, defaults to ""
    cache_key_with_request_path: bool = False,  # add request path to cache key
    cache_key_with_query_string: bool = False,  # add query parameters to cache key
    cache_key_with_func_args: bool = True,  # add the function arguments to cache key
    cache_key_with_full_args: bool = False,  # fully memoize the function arguments
    ignore_self: bool = False,
    ignore_cls: bool = False,
    args_to_ignore: list[str]
    | None = None,  # arguments to ignore when building the cache key with cache_key_with_func_args set to True
    cache_none_values: bool = False,  # cache None values. Incompatible with async_compute
    async_compute: bool = False,  # use a worker job to build the value, returning None while it's being built
    async_refresh_every: timedelta
    | None = None,  # refresh cache value periodically, implies cache_key_with_request_path, cache_key_with_query_string, cache_key_with_func_args set to False
    warmup_on_startup: bool = False,  # implies cache_key_with_request_path, cache_key_with_query_string, cache_key_with_func_args set to False
    on_cache_computed: Callable[[str, Any], Any]
    | None = None,  # callback called after value computed, before it's stored in the cache. takes cache_key, computed_value, returns the (possibly changed) value. If value is None, it won't stored in the cache (unless cache_none_values is True)
    async_compute_job_timeout: timedelta | None = None,
    warmup_timeout: timedelta = DEFAULT_WARMUP_STARTUP_TIMEOUT,
    atomic_writes: Literal[False]
    | Literal["at_least_once"]
    | Literal["at_most_once"] = False,
    no_serialization: bool | None = False,
) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
    """
    Decorator. Use this to cache the return value of a function.

    See @cached_for for all the details.
    """

    if args_to_ignore is None:
        args_to_ignore = []
    if ignore_self:
        args_to_ignore.append("self")
    if ignore_cls:
        args_to_ignore.append("cls")

    timeout: int | None = None
    if expire_in:
        timeout = to_seconds(expire_in)

    if local_ram_cache_only and atomic_writes:
        raise ValueError(
            "atomic_writes is not compatible with local_ram_cache_only=True"
        )

    if async_refresh_every is not None:
        if to_seconds(async_refresh_every) <= 0:
            raise ValueError(
                "async_refresh_every must be a strictly positive timedelta"
            )
        # now async_refresh_every can't be non-None and False
        if async_refresh_every < REFRESH_EVERY_GRANULARITY:
            raise ValueError(
                f"async_refresh_every must be greater than {REFRESH_EVERY_GRANULARITY}, the refresh granularity"
            )
        if to_seconds(async_refresh_every) > (timeout or alan_cache.default_timeout):
            raise ValueError(
                f"async_refresh_every must be smaller than expire_in or than the default timeout ({alan_cache.default_timeout} seconds)"
            )

    if warmup_on_startup or async_refresh_every:
        cache_key_with_func_args = False

    if async_refresh_every and (
        cache_key_with_request_path
        or cache_key_with_query_string
        or cache_key_with_func_args
        or cache_none_values
    ):
        raise ValueError(
            "async_refresh_every require async_compute and is not compatible with cache_key_with_request_path, cache_key_with_query_string, cache_key_with_func_args or cache_none_values"
        )

    if warmup_on_startup and (
        cache_key_with_request_path
        or cache_key_with_query_string
        or cache_key_with_func_args
    ):
        raise ValueError(
            "warmup_on_startup is not compatible with cache_key_with_request_path, cache_key_with_query_string, cache_key_with_func_args"
        )

    if async_compute and cache_none_values:
        raise ValueError("async_compute is not compatible with cache_none_values")

    def decorator(func: Callable[..., _T]) -> Callable[..., _T]:
        funcname = _funcname(func)
        if "<locals>" in funcname:
            raise ValueError(
                "Can't cache a function with a local name as this may lead to cache key collision. "
                "This is likely due to a downwards decorator missing @functool.wraps."
            )
        sig_args_len = len(get_arg_names(func))

        # perform checks that depends on the function
        if (warmup_on_startup or async_refresh_every) and _wants_args(func):
            raise ValueError(
                "warmup_on_startup and async_refresh_every can't be used on a function that takes args"
            )

        def make_cache_key(*args, **kwargs):  # type: ignore[no-untyped-def]
            omitted_args_ids: list[int] = kwargs.pop("__omitted_args_ids__", [])
            parts: list[Any] = []
            # make sure we have a value for all parts of the key otherwise we
            # coud have keys collision
            args_part: tuple = ()  # type: ignore[type-arg]
            kwargs_part: OrderedDict = OrderedDict()  # type: ignore[type-arg]
            if cache_key_with_func_args:
                # add function arguments to cache key
                (
                    args_part,
                    kwargs_part,
                ) = alan_cache.shared_cache._memoize_kwargs_to_args(
                    func, *args, args_to_ignore=args_to_ignore, **kwargs
                )
            if cache_key_with_full_args:
                # Make sure we have a value for all args, even the one not given, using None as default
                parts.extend(list(args_part) + [None] * (sig_args_len - len(args_part)))
            else:
                parts.append(args_part)
            parts.append(kwargs_part)
            req_path_part = ""
            if cache_key_with_request_path and has_request_context():
                req_path_part = request.path
            parts.append(req_path_part)
            req_string_part = []
            if cache_key_with_query_string and has_request_context():
                req_string_part = sorted(request.args.items(multi=True))
            parts.append(req_string_part)
            parts = [_encode(p) for p in parts]  # type: ignore[no-untyped-call]
            if omitted_args_ids:
                # replace args that were omitted from the cache key with a
                # globing Redis regex placeholder
                for omitted_args_id in omitted_args_ids:
                    parts[omitted_args_id] = "*"

            # prepend to the key the cache key prefix, from the
            if callable(cache_key_prefix):
                if _wants_args(cache_key_prefix):
                    base_key = cache_key_prefix(func, *args, **kwargs)
                else:
                    base_key = cache_key_prefix()
            else:
                base_key = cache_key_prefix
            if len(base_key) > 0:
                parts.insert(0, base_key)
            # always prepend the func name, so that we can list/purge cache
            # based on it
            parts.insert(0, funcname)
            cache_key = "-".join(parts)
            return cache_key

        func0 = func

        if on_cache_computed is not None:

            @functools.wraps(func)
            def _func_with_callback(*args, **kwargs):  # type: ignore[no-untyped-def]
                key = make_cache_key(*args, **kwargs)  # type: ignore[no-untyped-call]
                value = func(*args, **kwargs)
                return on_cache_computed(key, value)

            func0 = _func_with_callback

        func1 = func0
        if async_compute:
            # In async_compute mode we want that when the original function is
            # called, it's not run in sync mode, but instead it raises
            # AsyncValueBeingBuiltException, and the cache is rebuilt
            # asynchronously.

            # In async mode, if func is a method, RQ can't cope with loading
            # 'module.class.method.sync', because there are 2 levels of
            # attributes, and `rq.utils.import_attribute` copes with only one
            # level of attributes (see its code). So we create an additional
            # sync mode method at the same level, in this example
            # 'module.class._sync_method'

            # One additional subtlety, we need to add support for recursive
            # async functions: if the function to run in async mode calls
            # itself an async function, it's going to fail with
            # AsyncValueBeingBuiltException, and the job will fail and
            # RuntimeError will be raised. Instead, we use
            # alan_cache._running_in_an_async_worker to force only one level of
            # async computation

            @functools.wraps(func)
            def _sync_func(*args, **kwargs):  # type: ignore[no-untyped-def]
                alan_cache._running_in_an_async_worker += 1  # noqa: ALN027
                try:
                    ret = func(*args, **kwargs)
                except BaseException:
                    raise
                finally:
                    alan_cache._running_in_an_async_worker -= 1  # noqa: ALN027
                return ret

            setattr(getmodule(func), f"_sync_{func.__qualname__}", _sync_func)

            @functools.wraps(func)
            def func_async(*args, **kwargs):  # type: ignore[no-untyped-def]
                if alan_cache._running_in_an_async_worker > 0:  # noqa: ALN027
                    # we are already running in an async worker, don't start a
                    # new one, stay sync
                    return func(*args, **kwargs)

                cache_key = make_cache_key(*args, **kwargs)  # type: ignore[no-untyped-call]

                # late import to avoid import recursion
                from shared.helpers.asynchronous.computation import compute_async_result

                # compute_async_result raises AsyncValueBeingBuiltException
                # while the job runs, which is good for us: the cache won't be
                # populated (when getting), until we have a result
                async_value: _T = compute_async_result(
                    f"{func.__module__}._sync_{func.__qualname__}",
                    *args,
                    unique_id=cache_key,
                    job_timeout=(
                        int(async_compute_job_timeout.total_seconds())
                        if async_compute_job_timeout
                        else None
                    ),
                    result_ttl=timeout,
                    queue_name=CACHE_BUILDER_QUEUE,
                    **kwargs,
                )

                # if we reach here, we've successfully computed the value async,
                # using the original func, without on_cache_computed. Apply it
                # here, so that it's executed on the web worker, not on the rq
                # worker
                if on_cache_computed:
                    async_value = on_cache_computed(cache_key, async_value)

                return async_value

            func1 = func_async

        func2 = func1

        # if needed, enable forcing updating the cache when passing `_force_cache_update`
        forced_update = None
        if async_refresh_every or warmup_on_startup:

            def forced_update_cb(  # type: ignore[no-untyped-def]
                *args,  # noqa: ARG001
                _force_cache_update: bool = False,
                **kwargs,  # noqa: ARG001
            ) -> bool:
                return _force_cache_update

            forced_update = forced_update_cb

            # remove _force_cache_update from the args
            @functools.wraps(func)
            def func_with_force_update_arg(  # type: ignore[no-untyped-def]
                *args, _force_cache_update: bool = False, **kwargs
            ):
                return func1(*args, **kwargs)

            func2 = func_with_force_update_arg

        func3 = func2
        if async_refresh_every is not None:

            @functools.wraps(func)
            def func_with_registering_last_run_timestamp(*args, **kwargs):  # type: ignore[no-untyped-def]
                # if async_refresh_every and async_compute is also True
                # func2 will raise AsyncValueBeingBuiltException while retval is
                # being computed
                if async_refresh_every is not None:
                    alan_cache.redis.hset(
                        CACHED_FUNCS_TO_REFRESH,
                        funcname,
                        to_seconds(async_refresh_every),  # type: ignore[arg-type]
                    )

                retval = func2(*args, **kwargs)
                # only when the value is computed, we register the last run
                alan_cache.redis.hset(
                    CACHED_FUNCS_TO_REFRESH_LAST_RUN_FINISHED,
                    funcname,
                    int(time.time()),  # type: ignore[arg-type]
                )
                return retval

            func3 = func_with_registering_last_run_timestamp

        # Finally, we decorate the function with one or two layer of caching
        func4 = func3
        orig_func3 = func3
        if not local_ram_cache_only:
            shared_cache = alan_cache.shared_cache
            if atomic_writes:
                shared_cache = alan_cache.shared_cache_atomic
                if atomic_writes == "at_most_once":
                    # we want at most once, so we don't call the original
                    # computation method, instead we store the PID + thread_id,
                    # then we'll see who wins, and only this process will call
                    # the original computation method

                    @functools.wraps(func)
                    def _build_at_most_once_lock(*args, **kwargs) -> str:  # type: ignore[no-untyped-def]  # noqa: ARG001
                        return f"__atomic_lock_proc:{_get_proc_thread_id()}"

                    orig_func3 = func3
                    func3 = _build_at_most_once_lock  # type: ignore[assignment]

            tags1 = ["cache_type:shared_redis", f"async:{async_compute}"]
            func3_with_metric1 = _wrap_with_metrics(func, func3, "set", tags1)
            # apply the shared_cache decorator to func
            func3_cache_shared = shared_cache.cached(
                timeout=timeout,
                make_cache_key=make_cache_key,
                unless=unless,
                forced_update=forced_update,
                cache_none=cache_none_values,
            )(func3_with_metric1)

            func3_handle_atomic_conflict = func3_cache_shared

            if atomic_writes:

                def _retry_on_watch_exception(*args, **kwargs):  # type: ignore[no-untyped-def]
                    retval = None
                    while True:
                        try:
                            retval = func3_cache_shared(*args, **kwargs)
                            break
                        except WatchError:
                            continue
                    return retval

                func3_handle_atomic_conflict = _retry_on_watch_exception

            func4 = func3_handle_atomic_conflict
            if atomic_writes and atomic_writes == "at_most_once":
                # Here we handle replacing the PID by the computed valu (see
                # above comment)

                @functools.wraps(func)
                def set_real_value_after_lock_is_set(*args, **kwargs):  # type: ignore[no-untyped-def]
                    key = make_cache_key(*args, **kwargs)  # type: ignore[no-untyped-call]
                    # TODO: time out after too long
                    while True:
                        retval = func3_handle_atomic_conflict(*args, **kwargs)
                        retval_str = str(retval or "")
                        if not retval_str.startswith("__atomic_lock_proc:"):
                            # the other process has finished, return the value
                            return retval
                        proc_thread_id = retval_str.split(":")[1]
                        if proc_thread_id == _get_proc_thread_id():
                            # it's us! set the cache and return the val
                            computed_val = orig_func3(*args, **kwargs)
                            alan_cache.set(
                                key, computed_val, expire_in or timedelta(seconds=0)
                            )
                            return computed_val
                        # it's another process, wait for it to finish
                        time.sleep(_MINI_SLEEP_TIME_SEC)

                func4 = set_real_value_after_lock_is_set

        func5 = func4
        if not shared_redis_cache_only:
            local_cache = alan_cache.local_cache
            if no_serialization:
                local_cache = alan_cache.local_cache_no_serializer
            tags2 = ["cache_type:local_ram", f"async:{async_compute}"]
            _with_metric2 = _wrap_with_metrics(func, func4, "set", tags2)
            # apply the local_cache decorator to the maybe already decorated func
            _with_local_cache = local_cache.cached(
                timeout=timeout,
                make_cache_key=make_cache_key,
                unless=unless,
                forced_update=forced_update,
                cache_none=cache_none_values,
            )(_with_metric2)

            # wrap with a function that will check every 5 min if we need to delete local cache keys
            @functools.wraps(func)
            def _func_with_cleanup_local_cache(*args, **kwargs):  # type: ignore[no-untyped-def]
                if not isinstance(alan_cache.local_cache.cache, SimpleCache):
                    return _with_local_cache(*args, **kwargs)

                global _last_time_check_for_deletion

                now = datetime.now(UTC)
                interval = now - _last_time_check_for_deletion
                if interval.total_seconds() >= DELETION_CHECK_FREQUENCY_SECS:
                    _cleanup_local_cache_keys()

                return _with_local_cache(*args, **kwargs)

            func5 = _func_with_cleanup_local_cache

        func6 = _wrap_with_metrics(func, func5, "get", [f"async:{async_compute}"])

        # We do that instead of using a NullCache backend if disable_cache is
        # True, because NullCache still requires to init_app the cache, whereas
        # with this wrapper we don't need it
        @functools.wraps(func)
        def _wrap_with_disable_cache_and_register(*args, **kwargs) -> _T:  # type: ignore[no-untyped-def]
            if _bypass_cache(unless, func, *args, **kwargs):
                # maybe remove this arg as original func doesn't support it
                kwargs.pop("_force_cache_update", None)
                return func(*args, **kwargs)
            # register the funcname in Redis for easy listing/deleting of cache keys
            if funcname not in _registered_funcnames:
                alan_cache.redis.sadd(CACHED_FUNCS, funcname)
                _registered_funcnames.add(funcname)

            return func6(*args, **kwargs)  # type: ignore[no-any-return]

        func7 = _wrap_with_disable_cache_and_register

        if warmup_on_startup and current_app:

            @current_app.before_first_request
            def _warmup_cache():  # type: ignore[no-untyped-def]
                if not alan_cache.disable_cache:
                    timeout_end = time.monotonic() + warmup_timeout.total_seconds()
                    while True:
                        try:
                            func7(_force_cache_update=True)
                            break
                        except AsyncValueBeingBuiltException:
                            pass
                        if time.monotonic() > timeout_end:
                            raise TimeoutError(
                                f"warming up cache value took more than {warmup_timeout}"
                            )
                        time.sleep(_SHORT_SLEEP_TIME_SEC)

        func8 = func7
        if expire_when == "object_is_destroyed":
            # collect keys per class and instance, and a destructor on the class
            # to delete cache keys
            @functools.wraps(func)
            def _wrap_with_clear_at_object_destruction(self, *args, **kwargs) -> _T:  # type: ignore[no-untyped-def]
                cls = type(self)
                class_name = cls.__module__ + "." + cls.__qualname__
                if class_name not in _destructor_added:
                    existing_destructor = getattr(cls, "__del__", None)

                    def destructor(self):  # type: ignore[no-untyped-def]
                        keys = _instance_keys.get(class_name, {}).get(str(self), set())
                        alan_cache.delete_many(*keys)
                        _instance_keys.get(class_name, {}).pop(str(self), None)
                        if existing_destructor is not None:
                            return existing_destructor(self)
                        return

                    setattr(cls, "__del__", destructor)  # noqa: B010
                    _destructor_added.add(class_name)
                key = make_cache_key(self, *args, **kwargs)  # type: ignore[no-untyped-call]
                _instance_keys.setdefault(class_name, {}).setdefault(
                    str(self), set()
                ).add(key)
                return func7(self, *args, **kwargs)

            func8 = _wrap_with_clear_at_object_destruction

        def _clear_cached_func_all() -> None:
            alan_cache.clear_cached_func_all(func8)

        setattr(func8, "local_ram_cache_only", local_ram_cache_only)  # noqa: B010
        setattr(func8, "make_cache_key", make_cache_key)  # noqa: B010
        setattr(func8, "uncached", func)  # noqa: B010
        setattr(func8, "expire_in", expire_in)  # noqa: B010
        setattr(func8, "clear_cache", _clear_cached_func_all)  # noqa: B010
        setattr(func8, "cache_key_with_full_args", cache_key_with_full_args)  # noqa: B010

        if async_compute:
            # Also provide a "run_sync" function as attribute of the decorated
            # func, which forces running the code in sync mode
            def _forced_sync_mode(*args, **kwargs):  # type: ignore[no-untyped-def]
                cache_key = make_cache_key(*args, **kwargs)  # type: ignore[no-untyped-call]
                if alan_cache.has(cache_key):
                    return alan_cache.get(cache_key)
                else:
                    alan_cache._running_in_an_async_worker += 1  # noqa: ALN027
                    val = func8(*args, **kwargs)
                    alan_cache._running_in_an_async_worker -= 1  # noqa: ALN027
                    return val

            setattr(func8, "run_sync", _forced_sync_mode)  # noqa: B010

        return func8

    return decorator

shared.caching.cache.cached_for

cached_for(
    *,
    weeks=0,
    days=0,
    hours=0,
    minutes=0,
    seconds=0,
    **kwargs
)

Decorator for caching function results with time-based expiration.

This is a thin wrapper around @cached that provides a more intuitive interface for specifying cache expiration times. It supports both local RAM caching and shared Redis caching with extensive configuration options.

Parameters:

Name Type Description Default
weeks int

Number of weeks until cache expiration. Defaults to 0.

0
days int

Number of days until cache expiration. Defaults to 0.

0
hours int

Number of hours until cache expiration. Defaults to 0.

0
minutes int

Number of minutes until cache expiration. Defaults to 0.

0
seconds int

Number of seconds until cache expiration. Defaults to 0.

0
expire_when Literal['object_is_destroyed'] | None

Conditional expiration trigger. Defaults to None. Options: - "object_is_destroyed": Cache expires when class instance is destroyed

required
unless Callable[..., bool] | None

Callback that receives (func, args, *kwargs). If returns True, the value won't be cached. Defaults to None.

required
local_ram_cache_only bool

If True, cache only in local RAM, not Redis. Defaults to False.

required
shared_redis_cache_only bool

If True, cache only in Redis, not local RAM. Defaults to False.

required
no_serialization bool | None

If True, store values as-is without serialization. Use with local_ram_cache_only=True for ORM objects. Defaults to False.

required
atomic_writes Literal[False] | Literal['at_least_once'] | Literal['at_most_once']

Prevents race conditions. Defaults to False. Options: - False: No atomic writes (default) - "at_least_once": Function runs at least once, may run multiple times - "at_most_once": Function runs at most once across all processes

required
cache_key_prefix str | Callable[..., str]

String or callback for custom cache key prefixes. If callback, receives (func, args, *kwargs) or no args. Defaults to "".

required
cache_key_with_request_path bool

If True, include request path in cache key. Defaults to False.

required
cache_key_with_query_string bool

If True, include query parameters in cache key. Defaults to False.

required
cache_key_with_func_args bool

If True, include function arguments in cache key. Defaults to True.

required
cache_key_with_full_args bool

If True, hash arguments separately. Required for using clear_cached_func_some for partial cache invalidation. Defaults to False.

required
args_to_ignore list[str] | None

List of argument names to exclude from cache key when cache_key_with_func_args is True. Defaults to None.

required
ignore_self bool

If True, ignore 'self' parameter in cache key (equivalent to adding 'self' to args_to_ignore). Defaults to False.

required
ignore_cls bool

If True, ignore 'cls' parameter in cache key (equivalent to adding 'cls' to args_to_ignore). Defaults to False.

required
cache_none_values bool

If True, cache None values. Incompatible with async_compute. Defaults to False.

required
async_compute bool

If True, run function in background worker. Raises AsyncValueBeingBuiltException while computing. Defaults to False.

required
async_refresh_every timedelta | None

Refresh cache periodically. Requires minimum 5 minutes, forces cache_key_with_request_path and cache_key_with_query_string to False. Defaults to None.

required
warmup_on_startup bool

If True, pre-populate cache on application startup. Requires cache_key_with_request_path and cache_key_with_query_string to be False. Defaults to False.

required
on_cache_computed Callable[[str, Any], Any] | None

Callback called after function execution with (cache_key, value). Should return modified value or None to prevent caching. Defaults to None.

required
async_compute_job_timeout timedelta | None

Maximum time allowed for async cache computation. Defaults to None.

required
warmup_timeout timedelta

Maximum time allowed for cache warmup. Defaults to 10 seconds. Long warmup times can slow application startup.

required

Returns:

Name Type Description
Callable Callable[[Callable[_P, _T]], Callable[_P, _T]]

A decorator that wraps the target function with caching behavior.

Examples:

Basic time-based caching:

>>> @cached_for(minutes=30)
... def get_user_data(user_id: int):
...     return fetch_user_from_database(user_id)

Local RAM caching only:

>>> @cached_for(minutes=5, local_ram_cache_only=True)
... def get_session_data(session_id: str):
...     return fetch_session_data(session_id)

Async cache computation:

>>> @cached_for(hours=2, async_compute=True)
... def generate_report(report_id: int):
...     return create_expensive_report(report_id)

Conditional caching:

>>> @cached_for(minutes=30, unless=lambda func, user_id, *args, **kwargs: user_id is None)
... def get_user_preferences(user_id: int | None):
...     return fetch_preferences(user_id) if user_id else get_defaults()
Note
  • Equivalent to @cached(expire_in=timedelta(...), **kwargs)
  • The decorated function gains these attributes for introspection:
    • make_cache_key: Function to generate cache keys
    • uncached: Original function without caching
    • expire_in: Cache expiration timedelta or None
  • Async computation requires RQ workers running for CACHE_BUILDER_QUEUE
  • Atomic writes only work with Redis backend, not local RAM cache
  • Use cache_key_with_full_args=True for partial cache invalidation support
Source code in shared/caching/cache.py
def cached_for(  # type: ignore[no-untyped-def]
    *,
    weeks: int = 0,
    days: int = 0,
    hours: int = 0,
    minutes: int = 0,
    seconds: int = 0,
    **kwargs,
) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
    """
    Decorator for caching function results with time-based expiration.

    This is a thin wrapper around @cached that provides a more intuitive interface
    for specifying cache expiration times. It supports both local RAM caching and
    shared Redis caching with extensive configuration options.

    Args:
        weeks (int, optional): Number of weeks until cache expiration. Defaults to 0.
        days (int, optional): Number of days until cache expiration. Defaults to 0.
        hours (int, optional): Number of hours until cache expiration. Defaults to 0.
        minutes (int, optional): Number of minutes until cache expiration. Defaults to 0.
        seconds (int, optional): Number of seconds until cache expiration. Defaults to 0.
        expire_when (Literal["object_is_destroyed"] | None, optional): Conditional expiration trigger. Defaults to None. Options:
            - "object_is_destroyed": Cache expires when class instance is destroyed
        unless (Callable[..., bool] | None, optional): Callback that receives (func, *args, **kwargs). If returns True,
            the value won't be cached. Defaults to None.
        local_ram_cache_only (bool, optional): If True, cache only in local RAM, not Redis. Defaults to False.
        shared_redis_cache_only (bool, optional): If True, cache only in Redis, not local RAM. Defaults to False.
        no_serialization (bool | None, optional): If True, store values as-is without serialization.
            Use with local_ram_cache_only=True for ORM objects. Defaults to False.
        atomic_writes (Literal[False] | Literal["at_least_once"] | Literal["at_most_once"], optional): Prevents race conditions. Defaults to False. Options:
            - False: No atomic writes (default)
            - "at_least_once": Function runs at least once, may run multiple times
            - "at_most_once": Function runs at most once across all processes
        cache_key_prefix (str | Callable[..., str], optional): String or callback for custom cache key prefixes.
            If callback, receives (func, *args, **kwargs) or no args. Defaults to "".
        cache_key_with_request_path (bool, optional): If True, include request path in cache key. Defaults to False.
        cache_key_with_query_string (bool, optional): If True, include query parameters in cache key. Defaults to False.
        cache_key_with_func_args (bool, optional): If True, include function arguments in cache key.
            Defaults to True.
        cache_key_with_full_args (bool, optional): If True, hash arguments separately. Required for
            using clear_cached_func_some for partial cache invalidation. Defaults to False.
        args_to_ignore (list[str] | None, optional): List of argument names to exclude from cache key when
            cache_key_with_func_args is True. Defaults to None.
        ignore_self (bool, optional): If True, ignore 'self' parameter in cache key (equivalent to
            adding 'self' to args_to_ignore). Defaults to False.
        ignore_cls (bool, optional): If True, ignore 'cls' parameter in cache key (equivalent to
            adding 'cls' to args_to_ignore). Defaults to False.
        cache_none_values (bool, optional): If True, cache None values. Incompatible with async_compute. Defaults to False.
        async_compute (bool, optional): If True, run function in background worker. Raises
            AsyncValueBeingBuiltException while computing. Defaults to False.
        async_refresh_every (timedelta | None, optional): Refresh cache periodically. Requires minimum 5 minutes,
            forces cache_key_with_request_path and cache_key_with_query_string to False. Defaults to None.
        warmup_on_startup (bool, optional): If True, pre-populate cache on application startup.
            Requires cache_key_with_request_path and cache_key_with_query_string to be False. Defaults to False.
        on_cache_computed (Callable[[str, Any], Any] | None, optional): Callback called after function execution with (cache_key, value).
            Should return modified value or None to prevent caching. Defaults to None.
        async_compute_job_timeout (timedelta | None, optional): Maximum time allowed for async cache computation. Defaults to None.
        warmup_timeout (timedelta, optional): Maximum time allowed for cache warmup. Defaults to 10 seconds.
            Long warmup times can slow application startup.

    Returns:
        Callable: A decorator that wraps the target function with caching behavior.

    Examples:
        Basic time-based caching:

        >>> @cached_for(minutes=30)
        ... def get_user_data(user_id: int):
        ...     return fetch_user_from_database(user_id)

        Local RAM caching only:

        >>> @cached_for(minutes=5, local_ram_cache_only=True)
        ... def get_session_data(session_id: str):
        ...     return fetch_session_data(session_id)

        Async cache computation:

        >>> @cached_for(hours=2, async_compute=True)
        ... def generate_report(report_id: int):
        ...     return create_expensive_report(report_id)

        Conditional caching:

        >>> @cached_for(minutes=30, unless=lambda func, user_id, *args, **kwargs: user_id is None)
        ... def get_user_preferences(user_id: int | None):
        ...     return fetch_preferences(user_id) if user_id else get_defaults()

    Note:
        - Equivalent to @cached(expire_in=timedelta(...), **kwargs)
        - The decorated function gains these attributes for introspection:
            * make_cache_key: Function to generate cache keys
            * uncached: Original function without caching
            * expire_in: Cache expiration timedelta or None
        - Async computation requires RQ workers running for CACHE_BUILDER_QUEUE
        - Atomic writes only work with Redis backend, not local RAM cache
        - Use cache_key_with_full_args=True for partial cache invalidation support
    """
    return cached(
        expire_in=timedelta(
            weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds
        ),
        **kwargs,
    )

shared.caching.cache.delete_shared_then_local_cache_funcnames_async

delete_shared_then_local_cache_funcnames_async(funcnames)

Runs delete_shared_cache_funcname in async mode.

Returns a tuple saying how many keys have been deleted so far on Redis, and if the process is still running

Source code in shared/caching/cache.py
def delete_shared_then_local_cache_funcnames_async(
    funcnames: list[str],
) -> tuple[int, bool]:
    """
    Runs delete_shared_cache_funcname in async mode.

    Returns a tuple saying how many keys have been deleted so far on Redis, and if the process is still running
    """
    funcname_patterns = [(funcname, "*") for funcname in funcnames]
    return _delete_shared_then_local_cache_patterns_async(funcname_patterns)

shared.caching.cache.delete_shared_then_local_cache_patterns

delete_shared_then_local_cache_patterns(
    funcnames_and_filters, intermediate_result_key_name
)

Delete all keys in the Redis cache using CACHED_FUNC_KEYS sets for each funcname, then mark RAM caches to be deleted as well on all instances

Source code in shared/caching/cache.py
def delete_shared_then_local_cache_patterns(
    funcnames_and_filters: FuncnameFilterPairs,
    intermediate_result_key_name: str,
) -> int:
    """
    Delete all keys in the Redis cache using CACHED_FUNC_KEYS sets for each funcname, then mark RAM caches to be deleted as well on all instances
    """

    (redis, _) = get_redis_caching_connection()

    count = 0
    for funcname, filter_pattern in funcnames_and_filters:
        set_name = CACHED_FUNC_KEYS_SET_PREFIX + funcname

        if filter_pattern == "*":
            # Get all keys from the set and delete them all
            keys = list(cast("set[bytes]", redis.smembers(set_name)))
            if keys:
                count += len(keys)
                for batch_keys in group_iter(keys, 1000):
                    redis.delete(*batch_keys)
                # Remove all keys from the tracking set
                redis.delete(set_name)
        else:
            # Use sscan to find keys matching the filter pattern
            keys_to_delete: list[bytes] = []
            for key_bytes in redis.sscan_iter(
                set_name, match=f"{FLASK_CACHING_KEY_PREFIX}{funcname}-{filter_pattern}"
            ):
                keys_to_delete.append(key_bytes)

            if keys_to_delete:
                count += len(keys_to_delete)
                # Delete the cache keys themselves in batches of 1000
                for batch_keys in group_iter(keys_to_delete, 1000):
                    redis.delete(*batch_keys)
                # Remove these keys from the tracking set in batches of 1000
                for batch_keys in group_iter(keys_to_delete, 1000):
                    redis.srem(set_name, *batch_keys)

        # Update progress counter
        redis.setex(
            name=intermediate_result_key_name,
            time=timedelta(minutes=10),
            value=count,
        )

    redis.delete(intermediate_result_key_name)

    # Now, we can remove funcnames from the CACHED_FUNCS set if their sets are empty,
    # so that they're not listed in Alan Cache Eng Tool for instance
    for funcname, _ in funcnames_and_filters:
        set_name = CACHED_FUNC_KEYS_SET_PREFIX + funcname
        if redis.scard(set_name) == 0:
            # The function has no remaining cached keys, so remove it from
            # CACHED_FUNCS Note: yes there is a potential race condition: a new
            # key could appear between the scard and srem, but frankly we don't
            # care, the funcname wil be re-added the next time.
            redis.srem(CACHED_FUNCS, funcname)

    # Now that keys are deleted, we need to notify all workers to delete the
    # keys from their local cache as well. We do this by adding the funcname to
    # the dedicated special Redis Set CACHED_FUNCS_TO_DELETE

    # we use the redis time instead of python time from local box (it's a common
    # mistake to use local time and be prone to clock issues)
    (epoch, _) = redis.time()  # type: ignore[misc]
    # we bulk add the patterns to CACHED_FUNCS_TO_DELETE so that other python
    # process eventually delete the keys in their local ram cache as well. We
    # batch the patterns to avoid having a too long Redis command
    patterns_for_dict = [
        f"{funcname}-{filter_pattern}".replace("*", ".*")
        for funcname, filter_pattern in funcnames_and_filters
    ]
    for batched_patterns_for_dict in group_iter(patterns_for_dict, 100):
        redis.zadd(
            CACHED_FUNCS_TO_DELETE,
            dict.fromkeys(batched_patterns_for_dict, cast("int", epoch)),
        )
    # we set an expiration to the set, so that it doesn't grow indefinitely.
    # workers are recycled every 30 minutes, so we know we won't have
    # workers that didn't get the message
    redis.expire(CACHED_FUNCS_TO_DELETE, 3600)

    return count

shared.caching.cache.memory_only_cache

memory_only_cache(func)

Bases: Generic[_T, _P]

Source code in shared/caching/cache.py
def __init__(self, func: Callable[_P, _T]) -> None:
    self.func = func
    self._cached_func = cached(
        unless=lambda: self.disable_cache,
        local_ram_cache_only=True,
        cache_none_values=True,
        no_serialization=True,
    )(func)

    functools.update_wrapper(self, func)

__call__

__call__(*args, **kwargs)
Source code in shared/caching/cache.py
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T:
    return self._cached_func(*args, **kwargs)

__get__

__get__(obj, objtype)

Support instance methods.

Source code in shared/caching/cache.py
def __get__(self, obj, objtype):  # type: ignore[no-untyped-def]
    """
    Support instance methods.
    """
    return partial(self.__call__, obj)

__repr__

__repr__()

Return the function's docstring.

Source code in shared/caching/cache.py
def __repr__(self) -> str:
    """
    Return the function's docstring.
    """
    return self.func.__doc__  # type: ignore[return-value]

clear_cache

clear_cache()
Source code in shared/caching/cache.py
def clear_cache(self) -> None:
    self._cached_func.clear_cache()  # type: ignore[attr-defined]

disable_cache class-attribute instance-attribute

disable_cache = False

func instance-attribute

func = func

shared.caching.cache.request_cache_teardown_added module-attribute

request_cache_teardown_added = dict()

shared.caching.cache.request_cached

request_cached(
    *,
    for_http_methods=None,
    unless=None,
    cache_key_prefix=None,
    **kwargs
)

Decorator to cache at the request level, in RAM only, expires at the end of the request or 30s max.

It is a wrapper around @alan_cache.cached, with equivalent properties of: - expire_in=timedelta(seconds=30) - local_ram_cache_only=True - cache_key_with_func_args=True - cache_none_values=True

Additional args are passed to @alan_cache.cached, so see its doc for more info.

Note

WARNING: cache_key_prefix cannot be overriden

Source code in shared/caching/cache.py
def request_cached(  # type: ignore[no-untyped-def]
    *,
    for_http_methods: set[Literal["GET", "POST", "PUT", "PATCH", "DELETE"]]
    | None = None,
    unless: Callable[..., bool] | None = None,
    cache_key_prefix=None,
    **kwargs: Any,
) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
    """
    Decorator to cache at the request level, in RAM only, expires at the end of the request or 30s max.

    It is a wrapper around @alan_cache.cached, with equivalent properties of:
    - expire_in=timedelta(seconds=30)
    - local_ram_cache_only=True
    - cache_key_with_func_args=True
    - cache_none_values=True

    Additional args are passed to @alan_cache.cached, so see its doc for more info.

    Note:
        WARNING: cache_key_prefix cannot be overriden
    """
    if cache_key_prefix is not None:
        raise ValueError("cache_key_prefix cannot be set when using request_cached")
    if for_http_methods is None:
        for_http_methods = {"GET"}

    http_methods: set[Literal["GET", "POST", "PUT", "PATCH", "DELETE"]] = (
        for_http_methods
    )

    def decorator(func: Callable[_P, _T]) -> Callable[_P, _T]:
        def _request_is_not_the_right_http_method(
            f: Callable,  # type: ignore[type-arg]
            *args: Any,
            **kwargs: Any,
        ) -> bool:
            # if the caller passed an "unless" callback, call it
            if unless is not None and unless(f, *args, **kwargs):
                return True
            return bool((not request) or (request.method not in http_methods))

        def cache_key_prefix() -> str:
            cache_key: str = ""
            if has_request_context():
                request_id = id(request)
                request_uuid = getattr(request, "caching_uuid", None)
                if not request_uuid:
                    request_uuid = uuid.uuid4()
                    request.caching_uuid = request_uuid  # type: ignore[attr-defined]
                cache_key = f"{request_id}-{request_uuid}"
            return cache_key

        def on_cache_computed(cache_key: str, value: Any) -> Any:
            if has_request_context():
                if getattr(request, "cache_keys", None) is None:
                    request.cache_keys = set()  # type: ignore[attr-defined]
                request.cache_keys.add(cache_key)  # type: ignore[attr-defined]
            return value

        global request_cache_teardown_added
        if current_app and not request_cache_teardown_added.get(current_app, False):

            @current_app.teardown_request
            def destroy_request_cached_entries(_response_or_exc: Any) -> None:
                # teardown_request callbacks shall not raise an exception
                try:
                    cache_keys: set[str] = getattr(request, "cache_keys", set())
                    alan_cache.delete_many(*cache_keys)
                except Exception:  # noqa: S110
                    pass

            request_cache_teardown_added[current_app] = True

        return cached(
            expire_in=timedelta(seconds=30),
            local_ram_cache_only=True,
            cache_key_with_func_args=True,
            unless=_request_is_not_the_right_http_method,
            cache_key_prefix=cache_key_prefix,
            cache_none_values=True,
            on_cache_computed=on_cache_computed,
            **kwargs,
        )(func)

    return decorator

shared.caching.cache.thread_local_class_cache

thread_local_class_cache(attr_name)

Decorator to cache method results in thread-local storage.

Source code in shared/caching/cache.py
def thread_local_class_cache(
    attr_name: str,
) -> Callable[[Callable[_P2, _T2]], Callable[_P2, _T2]]:
    # TODO: merge it as an AlanCache backend maybe
    """Decorator to cache method results in thread-local storage."""

    def decorator(func: Callable[_P2, _T2]) -> Callable[_P2, _T2]:
        @wraps(func)
        def wrapper(*args: _P2.args, **kwargs: _P2.kwargs) -> _T2:
            cls = args[0]  # First argument is always the class or self
            # create a thread-local cache for the class
            if not hasattr(cls, "_thread_local_cache"):
                cls._thread_local_cache = threading.local()  # type: ignore[attr-defined]

            # compute the final key to use
            key_suffix = hashlib.md5(  # noqa: S324
                json.dumps({"args": args[1:], "kwargs": kwargs}).encode("utf-8")
            ).hexdigest()
            key = f"{attr_name}__{key_suffix}"

            # call the function is final key is not in the thread-local cache
            if (
                not hasattr(cls._thread_local_cache, key)  # type: ignore[attr-defined]
                or getattr(cls._thread_local_cache, key) is None  # type: ignore[attr-defined]
            ):
                result = func(*args, **kwargs)
                setattr(cls._thread_local_cache, key, result)  # type: ignore[attr-defined]
            return cast("_T2", getattr(cls._thread_local_cache, key))  # type: ignore[attr-defined]

        return wrapper

    return decorator