test: 增加测试

This commit is contained in:
2026-04-25 01:38:33 +08:00
parent b235651db5
commit 7f2bcf45de
23 changed files with 1573 additions and 1933 deletions

View File

@@ -1,21 +0,0 @@
MIT License
Copyright (c) 2022 Open Spaced Repetition
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,29 +0,0 @@
"""
py-fsrs
-------
Py-FSRS is the official Python implementation of the FSRS scheduler algorithm, which can be used to develop spaced repetition systems.
"""
from fsrs.scheduler import Scheduler
from fsrs.state import State
from fsrs.card import Card
from fsrs.rating import Rating
from fsrs.review_log import ReviewLog
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fsrs.optimizer import Optimizer
# lazy load the Optimizer module due to heavy dependencies
def __getattr__(name: str) -> type:
if name == "Optimizer":
global Optimizer
from fsrs.optimizer import Optimizer
return Optimizer
raise AttributeError
__all__ = ["Scheduler", "Card", "Rating", "ReviewLog", "State", "Optimizer"]

View File

@@ -1,167 +0,0 @@
"""
fsrs.card
---------
This module defines the Card and State classes.
Classes:
Card: Represents a flashcard in the FSRS system.
State: Enum representing the learning state of a Card object.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import time
import json
from typing import TypedDict
from typing_extensions import Self
from fsrs.state import State
class CardDict(TypedDict):
"""
JSON-serializable dictionary representation of a Card object.
"""
card_id: int
state: int
step: int | None
stability: float | None
difficulty: float | None
due: str
last_review: str | None
@dataclass(init=False)
class Card:
"""
Represents a flashcard in the FSRS system.
Attributes:
card_id: The id of the card. Defaults to the epoch milliseconds of when the card was created.
state: The card's current learning state.
step: The card's current learning or relearning step or None if the card is in the Review state.
stability: Core mathematical parameter used for future scheduling.
difficulty: Core mathematical parameter used for future scheduling.
due: The date and time when the card is due next.
last_review: The date and time of the card's last review.
"""
card_id: int
state: State
step: int | None
stability: float | None
difficulty: float | None
due: datetime
last_review: datetime | None
def __init__(
self,
card_id: int | None = None,
state: State = State.Learning,
step: int | None = None,
stability: float | None = None,
difficulty: float | None = None,
due: datetime | None = None,
last_review: datetime | None = None,
) -> None:
if card_id is None:
# epoch milliseconds of when the card was created
card_id = int(datetime.now(timezone.utc).timestamp() * 1000)
# wait 1ms to prevent potential card_id collision on next Card creation
time.sleep(0.001)
self.card_id = card_id
self.state = state
if self.state == State.Learning and step is None:
step = 0
self.step = step
self.stability = stability
self.difficulty = difficulty
if due is None:
due = datetime.now(timezone.utc)
self.due = due
self.last_review = last_review
def to_dict(self) -> CardDict:
"""
Returns a dictionary representation of the Card object.
Returns:
CardDict: A dictionary representation of the Card object.
"""
return {
"card_id": self.card_id,
"state": self.state.value,
"step": self.step,
"stability": self.stability,
"difficulty": self.difficulty,
"due": self.due.isoformat(),
"last_review": self.last_review.isoformat() if self.last_review else None,
}
@classmethod
def from_dict(cls, source_dict: CardDict) -> Self:
"""
Creates a Card object from an existing dictionary.
Args:
source_dict: A dictionary representing an existing Card object.
Returns:
Self: A Card object created from the provided dictionary.
"""
return cls(
card_id=int(source_dict["card_id"]),
state=State(int(source_dict["state"])),
step=source_dict["step"],
stability=(
float(source_dict["stability"]) if source_dict["stability"] else None
),
difficulty=(
float(source_dict["difficulty"]) if source_dict["difficulty"] else None
),
due=datetime.fromisoformat(source_dict["due"]),
last_review=(
datetime.fromisoformat(source_dict["last_review"])
if source_dict["last_review"]
else None
),
)
def to_json(self, indent: int | str | None = None) -> str:
"""
Returns a JSON-serialized string of the Card object.
Args:
indent: Equivalent argument to the indent in json.dumps()
Returns:
str: A JSON-serialized string of the Card object.
"""
return json.dumps(self.to_dict(), indent=indent)
@classmethod
def from_json(cls, source_json: str) -> Self:
"""
Creates a Card object from a JSON-serialized string.
Args:
source_json: A JSON-serialized string of an existing Card object.
Returns:
Self: A Card object created from the JSON string.
"""
source_dict: CardDict = json.loads(source_json)
return cls.from_dict(source_dict=source_dict)
__all__ = ["Card"]

View File

@@ -1,674 +0,0 @@
"""
fsrs.optimizer
---------
This module defines the optional Optimizer class.
"""
from fsrs.card import Card
from fsrs.review_log import ReviewLog, Rating
from fsrs.scheduler import (
Scheduler,
DEFAULT_PARAMETERS,
LOWER_BOUNDS_PARAMETERS,
UPPER_BOUNDS_PARAMETERS,
)
import math
from datetime import datetime, timezone
from copy import deepcopy
from random import Random
from statistics import mean
try:
import torch
from torch.nn import BCELoss
from torch import optim
import pandas as pd
from tqdm import tqdm
# weight clipping
LOWER_BOUNDS_PARAMETERS_TENSORS = torch.tensor(
LOWER_BOUNDS_PARAMETERS,
dtype=torch.float64,
)
UPPER_BOUNDS_PARAMETERS_TENSORS = torch.tensor(
UPPER_BOUNDS_PARAMETERS,
dtype=torch.float64,
)
# hyper parameters
num_epochs = 5
mini_batch_size = 512
learning_rate = 4e-2
max_seq_len = (
64 # up to the first 64 reviews of each card are used for optimization
)
class Optimizer:
"""
The FSRS optimizer.
Enables the optimization of FSRS scheduler parameters from existing review logs for more accurate interval calculations.
Attributes:
review_logs: A collection of previous ReviewLog objects from a user.
_revlogs_train: The collection of review logs, sorted and formatted for optimization.
"""
review_logs: tuple[ReviewLog, ...]
_revlogs_train: dict
def __init__(
self, review_logs: tuple[ReviewLog, ...] | list[ReviewLog]
) -> None:
"""
Initializes the Optimizer with a set of ReviewLogs. Also formats a copy of the review logs for optimization.
Note that the ReviewLogs provided by the user don't need to be in order.
"""
def _format_revlogs() -> dict:
"""
Sorts and converts the tuple of ReviewLog objects to a dictionary format for optimizing
"""
revlogs_train = {}
for review_log in self.review_logs:
# pull data out of current ReviewLog object
card_id = review_log.card_id
rating = review_log.rating
review_datetime = review_log.review_datetime
review_duration = review_log.review_duration
# if the card was rated Again, it was not recalled
recall = 0 if rating == Rating.Again else 1
# as a ML problem, [x, y] = [ [review_datetime, rating, review_duration], recall ]
datum = [[review_datetime, rating, review_duration], recall]
if card_id not in revlogs_train:
revlogs_train[card_id] = []
revlogs_train[card_id].append((datum))
revlogs_train[card_id] = sorted(
revlogs_train[card_id], key=lambda x: x[0][0]
) # keep reviews sorted
# sort the dictionary in order of when each card history starts
revlogs_train = dict(sorted(revlogs_train.items()))
return revlogs_train
self.review_logs = deepcopy(tuple(review_logs))
# format the ReviewLog data for optimization
self._revlogs_train = _format_revlogs()
def _compute_batch_loss(self, *, parameters: list[float]) -> float:
"""
Computes the current total loss for the entire batch of review logs.
"""
card_ids = list(self._revlogs_train.keys())
params = torch.tensor(parameters, dtype=torch.float64)
loss_fn = BCELoss()
scheduler = Scheduler(parameters=params)
step_losses = []
for card_id in card_ids:
card_review_history = self._revlogs_train[card_id][:max_seq_len]
for i in range(len(card_review_history)):
review = card_review_history[i]
x_date = review[0][0]
y_retrievability = review[1]
u_rating = review[0][1]
if i == 0:
card = Card(card_id=card_id, due=x_date)
y_pred_retrievability = scheduler.get_card_retrievability(
card=card, current_datetime=x_date
)
y_retrievability = torch.tensor(
y_retrievability, dtype=torch.float64
)
if card.last_review and (x_date - card.last_review).days > 0:
step_loss = loss_fn(y_pred_retrievability, y_retrievability)
step_losses.append(step_loss)
card, _ = scheduler.review_card(
card=card,
rating=u_rating,
review_datetime=x_date,
review_duration=None,
)
batch_loss = torch.sum(torch.stack(step_losses))
batch_loss = batch_loss.item() / len(step_losses)
return batch_loss
def compute_optimal_parameters(self, verbose: bool = False) -> list[float]:
"""
Computes a set of optimized parameters for the FSRS scheduler and returns it as a list of floats.
High level explanation of optimization:
---------------------------------------
FSRS is a many-to-many sequence model where the "State" at each step is a Card object at a given point in time,
the input is the time of the review and the output is the predicted retrievability of the card at the time of review.
Each card's review history can be thought of as a sequence, each review as a step and each collection of card review histories
as a batch.
The loss is computed by comparing the predicted retrievability of the Card at each step with whether the Card was actually
sucessfully recalled or not (0/1).
Finally, the card objects at each step in their sequences are updated using the current parameters of the Scheduler
as well as the rating given to that card by the user. The parameters of the Scheduler is what is being optimized.
"""
def _num_reviews() -> int:
"""
Computes how many Review-state reviews there are in the dataset.
Only the loss from Review-state reviews count for optimization and their number must
be computed in advance to properly initialize the Cosine Annealing learning rate scheduler.
"""
scheduler = Scheduler()
num_reviews = 0
# iterate through the card review histories
card_ids = list(self._revlogs_train.keys())
for card_id in card_ids:
card_review_history = self._revlogs_train[card_id][:max_seq_len]
# iterate through the current Card's review history
for i in range(len(card_review_history)):
review = card_review_history[i]
review_datetime = review[0][0]
rating = review[0][1]
# if this is the first review, create the Card object
if i == 0:
card = Card(card_id=card_id, due=review_datetime)
# only non-same-day reviews count
if (
card.last_review
and (review_datetime - card.last_review).days > 0
):
num_reviews += 1
card, _ = scheduler.review_card(
card=card,
rating=rating,
review_datetime=review_datetime,
review_duration=None,
)
return num_reviews
def _update_parameters(
*,
step_losses: list,
adam_optimizer: torch.optim.Adam,
params: torch.Tensor,
lr_scheduler: torch.optim.lr_scheduler.CosineAnnealingLR,
) -> None:
"""
Computes and updates the current FSRS parameters based on the step losses. Also updates the learning rate scheduler.
"""
# Backpropagate through the loss
mini_batch_loss = torch.sum(torch.stack(step_losses))
adam_optimizer.zero_grad() # clear previous gradients
mini_batch_loss.backward() # compute gradients
adam_optimizer.step() # Update parameters
# clamp the weights in place without modifying the computational graph
with torch.no_grad():
params.clamp_(
min=LOWER_BOUNDS_PARAMETERS_TENSORS,
max=UPPER_BOUNDS_PARAMETERS_TENSORS,
)
# update the learning rate
lr_scheduler.step()
# set local random seed for reproducibility
rng = Random(42)
card_ids = list(self._revlogs_train.keys())
num_reviews = _num_reviews()
if num_reviews < mini_batch_size:
return list(DEFAULT_PARAMETERS)
# Define FSRS Scheduler parameters as torch tensors with gradients
params = torch.tensor(
DEFAULT_PARAMETERS, requires_grad=True, dtype=torch.float64
)
loss_fn = BCELoss()
adam_optimizer = optim.Adam([params], lr=learning_rate)
lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(
optimizer=adam_optimizer,
T_max=math.ceil(num_reviews / mini_batch_size) * num_epochs,
)
best_params = None
best_loss = math.inf
# iterate through the epochs
for _ in tqdm(
range(num_epochs),
desc="Optimizing",
unit="epoch",
disable=(not verbose),
):
# randomly shuffle the order of which Card's review histories get computed first
# at the beginning of each new epoch
rng.shuffle(card_ids)
# initialize new scheduler with updated parameters each epoch
scheduler = Scheduler(parameters=params)
# stores the computed loss of each individual review
step_losses = []
# iterate through the card review histories (sequences)
for card_id in card_ids:
card_review_history = self._revlogs_train[card_id][:max_seq_len]
# iterate through the current Card's review history (steps)
for i in range(len(card_review_history)):
review = card_review_history[i]
# input
x_date = review[0][0]
# target
y_retrievability = review[1]
# update
u_rating = review[0][1]
# if this is the first review, create the Card object
if i == 0:
card = Card(card_id=card_id, due=x_date)
# predicted target
y_pred_retrievability = scheduler.get_card_retrievability(
card=card, current_datetime=x_date
)
y_retrievability = torch.tensor(
y_retrievability, dtype=torch.float64
)
# only compute step-loss on non-same-day reviews
if card.last_review and (x_date - card.last_review).days > 0:
step_loss = loss_fn(y_pred_retrievability, y_retrievability)
step_losses.append(step_loss)
# update the card's state
card, _ = scheduler.review_card(
card=card,
rating=u_rating,
review_datetime=x_date,
review_duration=None,
)
# take a gradient step after each mini-batch
if len(step_losses) == mini_batch_size:
_update_parameters(
step_losses=step_losses,
adam_optimizer=adam_optimizer,
params=params,
lr_scheduler=lr_scheduler,
)
# update the scheduler's with the new parameters
scheduler = Scheduler(parameters=params)
# clear the step losses for next batch
step_losses = []
# remove gradient history from tensor card parameters for next batch
card.stability = card.stability.detach()
card.difficulty = card.difficulty.detach()
# update params on remaining review logs
if len(step_losses) > 0:
_update_parameters(
step_losses=step_losses,
adam_optimizer=adam_optimizer,
params=params,
lr_scheduler=lr_scheduler,
)
# compute the current batch loss after each epoch
detached_params = [
x.detach().item() for x in list(params.detach())
] # convert to floats
with torch.no_grad():
epoch_batch_loss = self._compute_batch_loss(
parameters=detached_params
)
# if the batch loss is better with the current parameters, update the current best parameters
if epoch_batch_loss < best_loss:
best_loss = epoch_batch_loss
best_params = detached_params
return best_params
def _compute_probs_and_costs(self) -> dict[str, float]:
review_log_df = pd.DataFrame(
vars(review_log) for review_log in self.review_logs
)
review_log_df = review_log_df.sort_values(
by=["card_id", "review_datetime"], ascending=[True, True]
).reset_index(drop=True)
# dictionary to return
probs_and_costs_dict = {}
# compute the probabilities and costs of the first rating
first_reviews_df = review_log_df.loc[
~review_log_df["card_id"].duplicated(keep="first")
].reset_index(drop=True)
first_again_reviews_df = first_reviews_df.loc[
first_reviews_df["rating"] == Rating.Again
]
first_hard_reviews_df = first_reviews_df.loc[
first_reviews_df["rating"] == Rating.Hard
]
first_good_reviews_df = first_reviews_df.loc[
first_reviews_df["rating"] == Rating.Good
]
first_easy_reviews_df = first_reviews_df.loc[
first_reviews_df["rating"] == Rating.Easy
]
# compute the probability of the user clicking again/hard/good/easy given it's their first review
num_first_again = len(first_again_reviews_df)
num_first_hard = len(first_hard_reviews_df)
num_first_good = len(first_good_reviews_df)
num_first_easy = len(first_easy_reviews_df)
num_first_review = (
num_first_again + num_first_hard + num_first_good + num_first_easy
)
prob_first_again = num_first_again / num_first_review
prob_first_hard = num_first_hard / num_first_review
prob_first_good = num_first_good / num_first_review
prob_first_easy = num_first_easy / num_first_review
probs_and_costs_dict["prob_first_again"] = prob_first_again
probs_and_costs_dict["prob_first_hard"] = prob_first_hard
probs_and_costs_dict["prob_first_good"] = prob_first_good
probs_and_costs_dict["prob_first_easy"] = prob_first_easy
# compute the cost of the user clicking again/hard/good/easy on their first review
first_again_review_durations = list(
first_again_reviews_df["review_duration"]
)
first_hard_review_durations = list(first_hard_reviews_df["review_duration"])
first_good_review_durations = list(first_good_reviews_df["review_duration"])
first_easy_review_durations = list(first_easy_reviews_df["review_duration"])
avg_first_again_review_duration = (
mean(first_again_review_durations)
if first_again_review_durations
else 0
)
avg_first_hard_review_duration = (
mean(first_hard_review_durations) if first_hard_review_durations else 0
)
avg_first_good_review_duration = (
mean(first_good_review_durations) if first_good_review_durations else 0
)
avg_first_easy_review_duration = (
mean(first_easy_review_durations) if first_easy_review_durations else 0
)
probs_and_costs_dict["avg_first_again_review_duration"] = (
avg_first_again_review_duration
)
probs_and_costs_dict["avg_first_hard_review_duration"] = (
avg_first_hard_review_duration
)
probs_and_costs_dict["avg_first_good_review_duration"] = (
avg_first_good_review_duration
)
probs_and_costs_dict["avg_first_easy_review_duration"] = (
avg_first_easy_review_duration
)
# compute the probabilities and costs of non-first ratings
non_first_reviews_df = review_log_df.loc[
review_log_df["card_id"].duplicated(keep="first")
].reset_index(drop=True)
again_reviews_df = non_first_reviews_df.loc[
non_first_reviews_df["rating"] == Rating.Again
]
hard_reviews_df = non_first_reviews_df.loc[
non_first_reviews_df["rating"] == Rating.Hard
]
good_reviews_df = non_first_reviews_df.loc[
non_first_reviews_df["rating"] == Rating.Good
]
easy_reviews_df = non_first_reviews_df.loc[
non_first_reviews_df["rating"] == Rating.Easy
]
# compute the probability of the user clicking hard/good/easy given they correctly recalled the card
num_hard = len(hard_reviews_df)
num_good = len(good_reviews_df)
num_easy = len(easy_reviews_df)
num_recall = num_hard + num_good + num_easy
prob_hard = num_hard / num_recall
prob_good = num_good / num_recall
prob_easy = num_easy / num_recall
probs_and_costs_dict["prob_hard"] = prob_hard
probs_and_costs_dict["prob_good"] = prob_good
probs_and_costs_dict["prob_easy"] = prob_easy
again_review_durations = list(again_reviews_df["review_duration"])
hard_review_durations = list(hard_reviews_df["review_duration"])
good_review_durations = list(good_reviews_df["review_duration"])
easy_review_durations = list(easy_reviews_df["review_duration"])
avg_again_review_duration = (
mean(again_review_durations) if again_review_durations else 0
)
avg_hard_review_duration = (
mean(hard_review_durations) if hard_review_durations else 0
)
avg_good_review_duration = (
mean(good_review_durations) if good_review_durations else 0
)
avg_easy_review_duration = (
mean(easy_review_durations) if easy_review_durations else 0
)
probs_and_costs_dict["avg_again_review_duration"] = (
avg_again_review_duration
)
probs_and_costs_dict["avg_hard_review_duration"] = avg_hard_review_duration
probs_and_costs_dict["avg_good_review_duration"] = avg_good_review_duration
probs_and_costs_dict["avg_easy_review_duration"] = avg_easy_review_duration
return probs_and_costs_dict
def _simulate_cost(
self,
*,
desired_retention: float,
parameters: tuple[float, ...] | list[float],
num_cards_simulate: int,
probs_and_costs_dict: dict[str, float],
) -> float:
rng = Random(42)
# simulate from the beginning of 2025 till before the beginning of 2026
start_date = datetime(2025, 1, 1, 0, 0, 0, 0, timezone.utc)
end_date = datetime(2026, 1, 1, 0, 0, 0, 0, timezone.utc)
scheduler = Scheduler(
parameters=parameters,
desired_retention=desired_retention,
enable_fuzzing=False,
)
# unpack probs_and_costs_dict
prob_first_again = probs_and_costs_dict["prob_first_again"]
prob_first_hard = probs_and_costs_dict["prob_first_hard"]
prob_first_good = probs_and_costs_dict["prob_first_good"]
prob_first_easy = probs_and_costs_dict["prob_first_easy"]
avg_first_again_review_duration = probs_and_costs_dict[
"avg_first_again_review_duration"
]
avg_first_hard_review_duration = probs_and_costs_dict[
"avg_first_hard_review_duration"
]
avg_first_good_review_duration = probs_and_costs_dict[
"avg_first_good_review_duration"
]
avg_first_easy_review_duration = probs_and_costs_dict[
"avg_first_easy_review_duration"
]
prob_hard = probs_and_costs_dict["prob_hard"]
prob_good = probs_and_costs_dict["prob_good"]
prob_easy = probs_and_costs_dict["prob_easy"]
avg_again_review_duration = probs_and_costs_dict[
"avg_again_review_duration"
]
avg_hard_review_duration = probs_and_costs_dict["avg_hard_review_duration"]
avg_good_review_duration = probs_and_costs_dict["avg_good_review_duration"]
avg_easy_review_duration = probs_and_costs_dict["avg_easy_review_duration"]
simulation_cost = 0
for i in range(num_cards_simulate):
card = Card()
curr_date = start_date
while curr_date < end_date:
# the card is new
if curr_date == start_date:
rating = rng.choices(
[Rating.Again, Rating.Hard, Rating.Good, Rating.Easy],
weights=[
prob_first_again,
prob_first_hard,
prob_first_good,
prob_first_easy,
],
)[0]
if rating == Rating.Again:
simulation_cost += avg_first_again_review_duration
elif rating == Rating.Hard:
simulation_cost += avg_first_hard_review_duration
elif rating == Rating.Good:
simulation_cost += avg_first_good_review_duration
elif rating == Rating.Easy:
simulation_cost += avg_first_easy_review_duration
# the card is not new
else:
rating = rng.choices(
["recall", Rating.Again],
weights=[desired_retention, 1.0 - desired_retention],
)[0]
if rating == "recall":
# compute probability that the user chose hard/good/easy, GIVEN that they correctly recalled the card
rating = rng.choices(
[Rating.Hard, Rating.Good, Rating.Easy],
weights=[prob_hard, prob_good, prob_easy],
)[0]
if rating == Rating.Again:
simulation_cost += avg_again_review_duration
elif rating == Rating.Hard:
simulation_cost += avg_hard_review_duration
elif rating == Rating.Good:
simulation_cost += avg_good_review_duration
elif rating == Rating.Easy:
simulation_cost += avg_easy_review_duration
card, _ = scheduler.review_card(
card=card, rating=rating, review_datetime=curr_date
)
curr_date = card.due
total_knowledge = desired_retention * num_cards_simulate
simulation_cost = simulation_cost / total_knowledge
return simulation_cost
def compute_optimal_retention(
self, parameters: tuple[float, ...] | list[float]
) -> list[float]:
def _validate_review_logs() -> None:
if len(self.review_logs) < 512:
raise ValueError(
"Not enough ReviewLog's: at least 512 ReviewLog objects are required to compute optimal retention"
)
for review_log in self.review_logs:
if review_log.review_duration is None:
raise ValueError(
"ReviewLog.review_duration cannot be None when computing optimal retention"
)
_validate_review_logs()
NUM_CARDS_SIMULATE = 1000
DESIRED_RETENTIONS = [0.7, 0.75, 0.8, 0.85, 0.9, 0.95]
probs_and_costs_dict = self._compute_probs_and_costs()
simulation_costs = []
for desired_retention in DESIRED_RETENTIONS:
simulation_cost = self._simulate_cost(
desired_retention=desired_retention,
parameters=parameters,
num_cards_simulate=NUM_CARDS_SIMULATE,
probs_and_costs_dict=probs_and_costs_dict,
)
simulation_costs.append(simulation_cost)
min_index = simulation_costs.index(min(simulation_costs))
optimal_retention = DESIRED_RETENTIONS[min_index]
return optimal_retention
except ImportError:
class Optimizer:
def __init__(self, *args, **kwargs) -> None:
raise ImportError(
'Optimizer is not installed.\nInstall it with: pip install "fsrs[optimizer]"'
)
__all__ = ["Optimizer"]

View File

View File

@@ -1,15 +0,0 @@
from enum import IntEnum
class Rating(IntEnum):
"""
Enum representing the four possible ratings when reviewing a card.
"""
Again = 1
Hard = 2
Good = 3
Easy = 4
__all__ = ["Rating"]

View File

@@ -1,117 +0,0 @@
"""
fsrs.review_log
---------
This module defines the ReviewLog and Rating classes.
Classes:
ReviewLog: Represents the log entry of a Card that has been reviewed.
Rating: Enum representing the four possible ratings when reviewing a card.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import TypedDict
import json
from typing_extensions import Self
from fsrs.rating import Rating
class ReviewLogDict(TypedDict):
"""
JSON-serializable dictionary representation of a ReviewLog object.
"""
card_id: int
rating: int
review_datetime: str
review_duration: int | None
@dataclass
class ReviewLog:
"""
Represents the log entry of a Card object that has been reviewed.
Attributes:
card_id: The id of the card being reviewed.
rating: The rating given to the card during the review.
review_datetime: The date and time of the review.
review_duration: The number of milliseconds it took to review the card or None if unspecified.
"""
card_id: int
rating: Rating
review_datetime: datetime
review_duration: int | None
def to_dict(
self,
) -> ReviewLogDict:
"""
Returns a dictionary representation of the ReviewLog object.
Returns:
ReviewLogDict: A dictionary representation of the ReviewLog object.
"""
return {
"card_id": self.card_id,
"rating": int(self.rating),
"review_datetime": self.review_datetime.isoformat(),
"review_duration": self.review_duration,
}
@classmethod
def from_dict(
cls,
source_dict: ReviewLogDict,
) -> Self:
"""
Creates a ReviewLog object from an existing dictionary.
Args:
source_dict: A dictionary representing an existing ReviewLog object.
Returns:
Self: A ReviewLog object created from the provided dictionary.
"""
return cls(
card_id=source_dict["card_id"],
rating=Rating(int(source_dict["rating"])),
review_datetime=datetime.fromisoformat(source_dict["review_datetime"]),
review_duration=source_dict["review_duration"],
)
def to_json(self, indent: int | str | None = None) -> str:
"""
Returns a JSON-serialized string of the ReviewLog object.
Args:
indent: Equivalent argument to the indent in json.dumps()
Returns:
str: A JSON-serialized string of the ReviewLog object.
"""
return json.dumps(self.to_dict(), indent=indent)
@classmethod
def from_json(cls, source_json: str) -> Self:
"""
Creates a ReviewLog object from a JSON-serialized string.
Args:
source_json: A JSON-serialized string of an existing ReviewLog object.
Returns:
Self: A ReviewLog object created from the JSON string.
"""
source_dict: ReviewLogDict = json.loads(source_json)
return cls.from_dict(source_dict=source_dict)
__all__ = ["ReviewLog"]

View File

@@ -1,856 +0,0 @@
"""
fsrs.scheduler
---------
This module defines the Scheduler class as well as the various constants used in its calculations.
Classes:
Scheduler: The FSRS spaced-repetition scheduler.
"""
from __future__ import annotations
from collections.abc import Sequence
import math
from datetime import datetime, timezone, timedelta
from copy import copy
import json
from random import random
from dataclasses import dataclass
from fsrs.state import State
from fsrs.card import Card
from fsrs.rating import Rating
from fsrs.review_log import ReviewLog
from typing import TYPE_CHECKING, TypedDict, overload
if TYPE_CHECKING:
from torch import Tensor # torch is optional; import only for type checking
from typing_extensions import Self
FSRS_DEFAULT_DECAY = 0.1542
DEFAULT_PARAMETERS = (
0.212,
1.2931,
2.3065,
8.2956,
6.4133,
0.8334,
3.0194,
0.001,
1.8722,
0.1666,
0.796,
1.4835,
0.0614,
0.2629,
1.6483,
0.6014,
1.8729,
0.5425,
0.0912,
0.0658,
FSRS_DEFAULT_DECAY,
)
STABILITY_MIN = 0.001
LOWER_BOUNDS_PARAMETERS = (
STABILITY_MIN,
STABILITY_MIN,
STABILITY_MIN,
STABILITY_MIN,
1.0,
0.001,
0.001,
0.001,
0.0,
0.0,
0.001,
0.001,
0.001,
0.001,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
0.1,
)
INITIAL_STABILITY_MAX = 100.0
UPPER_BOUNDS_PARAMETERS = (
INITIAL_STABILITY_MAX,
INITIAL_STABILITY_MAX,
INITIAL_STABILITY_MAX,
INITIAL_STABILITY_MAX,
10.0,
4.0,
4.0,
0.75,
4.5,
0.8,
3.5,
5.0,
0.25,
0.9,
4.0,
1.0,
6.0,
2.0,
2.0,
0.8,
0.8,
)
MIN_DIFFICULTY = 1.0
MAX_DIFFICULTY = 10.0
FUZZ_RANGES = [
{
"start": 2.5,
"end": 7.0,
"factor": 0.15,
},
{
"start": 7.0,
"end": 20.0,
"factor": 0.1,
},
{
"start": 20.0,
"end": math.inf,
"factor": 0.05,
},
]
class SchedulerDict(TypedDict):
"""
JSON-serializable dictionary representation of a Scheduler object.
"""
parameters: list[float]
desired_retention: float
learning_steps: list[int]
relearning_steps: list[int]
maximum_interval: int
enable_fuzzing: bool
@dataclass(init=False)
class Scheduler:
"""
The FSRS scheduler.
Enables the reviewing and future scheduling of cards according to the FSRS algorithm.
Attributes:
parameters: The model weights of the FSRS scheduler.
desired_retention: The desired retention rate of cards scheduled with the scheduler.
learning_steps: Small time intervals that schedule cards in the Learning state.
relearning_steps: Small time intervals that schedule cards in the Relearning state.
maximum_interval: The maximum number of days a Review-state card can be scheduled into the future.
enable_fuzzing: Whether to apply a small amount of random 'fuzz' to calculated intervals.
"""
parameters: tuple[float, ...]
desired_retention: float
learning_steps: tuple[timedelta, ...]
relearning_steps: tuple[timedelta, ...]
maximum_interval: int
enable_fuzzing: bool
def __init__(
self,
parameters: Sequence[float] = DEFAULT_PARAMETERS,
desired_retention: float = 0.9,
learning_steps: tuple[timedelta, ...] | list[timedelta] = (
timedelta(minutes=1),
timedelta(minutes=10),
),
relearning_steps: tuple[timedelta, ...] | list[timedelta] = (
timedelta(minutes=10),
),
maximum_interval: int = 36500,
enable_fuzzing: bool = True,
) -> None:
self._validate_parameters(parameters=parameters)
self.parameters = tuple(parameters)
self.desired_retention = desired_retention
self.learning_steps = tuple(learning_steps)
self.relearning_steps = tuple(relearning_steps)
self.maximum_interval = maximum_interval
self.enable_fuzzing = enable_fuzzing
self._DECAY = -self.parameters[20]
self._FACTOR = 0.9 ** (1 / self._DECAY) - 1
def _validate_parameters(self, *, parameters: Sequence[float]) -> None:
if len(parameters) != len(LOWER_BOUNDS_PARAMETERS):
raise ValueError(
f"Expected {len(LOWER_BOUNDS_PARAMETERS)} parameters, got {len(parameters)}."
)
error_messages = []
for index, (parameter, lower_bound, upper_bound) in enumerate(
zip(parameters, LOWER_BOUNDS_PARAMETERS, UPPER_BOUNDS_PARAMETERS)
):
if not lower_bound <= parameter <= upper_bound:
error_message = f"parameters[{index}] = {parameter} is out of bounds: ({lower_bound}, {upper_bound})"
error_messages.append(error_message)
if len(error_messages) > 0:
raise ValueError(
"One or more parameters are out of bounds:\n"
+ "\n".join(error_messages)
)
def get_card_retrievability(
self, card: Card, current_datetime: datetime | None = None
) -> float:
"""
Calculates a Card object's current retrievability for a given date and time.
The retrievability of a card is the predicted probability that the card is correctly recalled at the provided datetime.
Args:
card: The card whose retrievability is to be calculated
current_datetime: The current date and time
Returns:
float: The retrievability of the Card object.
"""
if card.last_review is None or card.stability is None:
return 0
if current_datetime is None:
current_datetime = datetime.now(timezone.utc)
elapsed_days = max(0, (current_datetime - card.last_review).days)
return (1 + self._FACTOR * elapsed_days / card.stability) ** self._DECAY
def review_card(
self,
card: Card,
rating: Rating,
review_datetime: datetime | None = None,
review_duration: int | None = None,
) -> tuple[Card, ReviewLog]:
"""
Reviews a card with a given rating at a given time for a specified duration.
Args:
card: The card being reviewed.
rating: The chosen rating for the card being reviewed.
review_datetime: The date and time of the review.
review_duration: The number of miliseconds it took to review the card or None if unspecified.
Returns:
tuple[Card,ReviewLog]: A tuple containing the updated, reviewed card and its corresponding review log.
Raises:
ValueError: If the `review_datetime` argument is not timezone-aware and set to UTC.
"""
if review_datetime is not None and (
(review_datetime.tzinfo is None) or (review_datetime.tzinfo != timezone.utc)
):
raise ValueError("datetime must be timezone-aware and set to UTC")
card = copy(card)
if review_datetime is None:
review_datetime = datetime.now(timezone.utc)
days_since_last_review = (
(review_datetime - card.last_review).days if card.last_review else None
)
match card.state:
case State.Learning:
assert card.step is not None
# update the card's stability and difficulty
if card.stability is None or card.difficulty is None:
card.stability = self._initial_stability(rating=rating)
card.difficulty = self._initial_difficulty(
rating=rating, clamp=True
)
elif days_since_last_review is not None and days_since_last_review < 1:
card.stability = self._short_term_stability(
stability=card.stability, rating=rating
)
card.difficulty = self._next_difficulty(
difficulty=card.difficulty, rating=rating
)
else:
card.stability = self._next_stability(
difficulty=card.difficulty,
stability=card.stability,
retrievability=self.get_card_retrievability(
card,
current_datetime=review_datetime,
),
rating=rating,
)
card.difficulty = self._next_difficulty(
difficulty=card.difficulty, rating=rating
)
# calculate the card's next interval
## first if-clause handles edge case where the Card in the Learning state was previously
## scheduled with a Scheduler with more learning_steps than the current Scheduler
if len(self.learning_steps) == 0 or (
card.step >= len(self.learning_steps)
and rating in (Rating.Hard, Rating.Good, Rating.Easy)
):
card.state = State.Review
card.step = None
next_interval_days = self._next_interval(stability=card.stability)
next_interval = timedelta(days=next_interval_days)
else:
match rating:
case Rating.Again:
card.step = 0
next_interval = self.learning_steps[card.step]
case Rating.Hard:
# card step stays the same
if card.step == 0 and len(self.learning_steps) == 1:
next_interval = self.learning_steps[0] * 1.5
elif card.step == 0 and len(self.learning_steps) >= 2:
next_interval = (
self.learning_steps[0] + self.learning_steps[1]
) / 2.0
else:
next_interval = self.learning_steps[card.step]
case Rating.Good:
if card.step + 1 == len(
self.learning_steps
): # the last step
card.state = State.Review
card.step = None
next_interval_days = self._next_interval(
stability=card.stability
)
next_interval = timedelta(days=next_interval_days)
else:
card.step += 1
next_interval = self.learning_steps[card.step]
case Rating.Easy:
card.state = State.Review
card.step = None
next_interval_days = self._next_interval(
stability=card.stability
)
next_interval = timedelta(days=next_interval_days)
case _:
raise ValueError(f"Unknown rating: {rating}")
case State.Review:
assert card.stability is not None
assert card.difficulty is not None
# update the card's stability and difficulty
if days_since_last_review is not None and days_since_last_review < 1:
card.stability = self._short_term_stability(
stability=card.stability, rating=rating
)
else:
card.stability = self._next_stability(
difficulty=card.difficulty,
stability=card.stability,
retrievability=self.get_card_retrievability(
card,
current_datetime=review_datetime,
),
rating=rating,
)
card.difficulty = self._next_difficulty(
difficulty=card.difficulty, rating=rating
)
# calculate the card's next interval
match rating:
case Rating.Again:
# if there are no relearning steps (they were left blank)
if len(self.relearning_steps) == 0:
next_interval_days = self._next_interval(
stability=card.stability
)
next_interval = timedelta(days=next_interval_days)
else:
card.state = State.Relearning
card.step = 0
next_interval = self.relearning_steps[card.step]
case Rating.Hard | Rating.Good | Rating.Easy:
next_interval_days = self._next_interval(
stability=card.stability
)
next_interval = timedelta(days=next_interval_days)
case _:
raise ValueError(f"Unknown rating: {rating}")
case State.Relearning:
assert card.stability is not None
assert card.difficulty is not None
assert card.step is not None
# update the card's stability and difficulty
if days_since_last_review is not None and days_since_last_review < 1:
card.stability = self._short_term_stability(
stability=card.stability, rating=rating
)
card.difficulty = self._next_difficulty(
difficulty=card.difficulty, rating=rating
)
else:
card.stability = self._next_stability(
difficulty=card.difficulty,
stability=card.stability,
retrievability=self.get_card_retrievability(
card,
current_datetime=review_datetime,
),
rating=rating,
)
card.difficulty = self._next_difficulty(
difficulty=card.difficulty, rating=rating
)
# calculate the card's next interval
## first if-clause handles edge case where the Card in the Relearning state was previously
## scheduled with a Scheduler with more relearning_steps than the current Scheduler
if len(self.relearning_steps) == 0 or (
card.step >= len(self.relearning_steps)
and rating in (Rating.Hard, Rating.Good, Rating.Easy)
):
card.state = State.Review
card.step = None
next_interval_days = self._next_interval(stability=card.stability)
next_interval = timedelta(days=next_interval_days)
else:
match rating:
case Rating.Again:
card.step = 0
next_interval = self.relearning_steps[card.step]
case Rating.Hard:
# card step stays the same
if card.step == 0 and len(self.relearning_steps) == 1:
next_interval = self.relearning_steps[0] * 1.5
elif card.step == 0 and len(self.relearning_steps) >= 2:
next_interval = (
self.relearning_steps[0] + self.relearning_steps[1]
) / 2.0
else:
next_interval = self.relearning_steps[card.step]
case Rating.Good:
if card.step + 1 == len(
self.relearning_steps
): # the last step
card.state = State.Review
card.step = None
next_interval_days = self._next_interval(
stability=card.stability
)
next_interval = timedelta(days=next_interval_days)
else:
card.step += 1
next_interval = self.relearning_steps[card.step]
case Rating.Easy:
card.state = State.Review
card.step = None
next_interval_days = self._next_interval(
stability=card.stability
)
next_interval = timedelta(days=next_interval_days)
case _:
raise ValueError(f"Unknown rating: {rating}")
case _:
raise ValueError(f"Unknown card state: {card.state}")
if self.enable_fuzzing and card.state == State.Review:
next_interval = self._get_fuzzed_interval(interval=next_interval)
card.due = review_datetime + next_interval
card.last_review = review_datetime
review_log = ReviewLog(
card_id=card.card_id,
rating=rating,
review_datetime=review_datetime,
review_duration=review_duration,
)
return card, review_log
def reschedule_card(self, card: Card, review_logs: list[ReviewLog]) -> Card:
"""
Reschedules/updates the given card with the current scheduler provided that card's review logs.
If the current card was previously scheduled with a different scheduler, you may want to reschedule/update
it as if it had always been scheduled with this current scheduler. For example, you may want to reschedule
each of your cards with a new scheduler after computing the optimal parameters with the Optimizer.
Args:
card: The card to be rescheduled/updated.
review_logs: A list of that card's review logs (order doesn't matter).
Returns:
Card: A new card that has been rescheduled/updated with this current scheduler.
Raises:
ValueError: If any of the review logs are for a card other than the one specified, this will raise an error.
"""
for review_log in review_logs:
if review_log.card_id != card.card_id:
raise ValueError(
f"ReviewLog card_id {review_log.card_id} does not match Card card_id {card.card_id}"
)
review_logs = sorted(review_logs, key=lambda log: log.review_datetime)
rescheduled_card = Card(card_id=card.card_id, due=card.due)
for review_log in review_logs:
rescheduled_card, _ = self.review_card(
card=rescheduled_card,
rating=review_log.rating,
review_datetime=review_log.review_datetime,
)
return rescheduled_card
def to_dict(
self,
) -> SchedulerDict:
"""
Returns a dictionary representation of the Scheduler object.
Returns:
SchedulerDict: A dictionary representation of the Scheduler object.
"""
return {
"parameters": list(self.parameters),
"desired_retention": self.desired_retention,
"learning_steps": [
int(learning_step.total_seconds())
for learning_step in self.learning_steps
],
"relearning_steps": [
int(relearning_step.total_seconds())
for relearning_step in self.relearning_steps
],
"maximum_interval": self.maximum_interval,
"enable_fuzzing": self.enable_fuzzing,
}
@classmethod
def from_dict(cls, source_dict: SchedulerDict) -> Self:
"""
Creates a Scheduler object from an existing dictionary.
Args:
source_dict: A dictionary representing an existing Scheduler object.
Returns:
Self: A Scheduler object created from the provided dictionary.
"""
return cls(
parameters=source_dict["parameters"],
desired_retention=source_dict["desired_retention"],
learning_steps=[
timedelta(seconds=learning_step)
for learning_step in source_dict["learning_steps"]
],
relearning_steps=[
timedelta(seconds=relearning_step)
for relearning_step in source_dict["relearning_steps"]
],
maximum_interval=source_dict["maximum_interval"],
enable_fuzzing=source_dict["enable_fuzzing"],
)
def to_json(self, indent: int | str | None = None) -> str:
"""
Returns a JSON-serialized string of the Scheduler object.
Args:
indent: Equivalent argument to the indent in json.dumps()
Returns:
str: A JSON-serialized string of the Scheduler object.
"""
return json.dumps(self.to_dict(), indent=indent)
@classmethod
def from_json(cls, source_json: str) -> Self:
"""
Creates a Scheduler object from a JSON-serialized string.
Args:
source_json: A JSON-serialized string of an existing Scheduler object.
Returns:
Self: A Scheduler object created from the JSON string.
"""
source_dict: SchedulerDict = json.loads(source_json)
return cls.from_dict(source_dict=source_dict)
@overload
def _clamp_difficulty(self, *, difficulty: float) -> float: ...
@overload
def _clamp_difficulty(self, *, difficulty: Tensor) -> Tensor: ...
def _clamp_difficulty(self, *, difficulty: float | Tensor) -> float | Tensor:
if isinstance(difficulty, (int, float)):
difficulty = min(max(difficulty, MIN_DIFFICULTY), MAX_DIFFICULTY)
else:
difficulty = difficulty.clamp(min=MIN_DIFFICULTY, max=MAX_DIFFICULTY)
return difficulty
@overload
def _clamp_stability(self, *, stability: float) -> float: ...
@overload
def _clamp_stability(self, *, stability: Tensor) -> Tensor: ...
def _clamp_stability(self, *, stability: float | Tensor) -> float | Tensor:
if isinstance(stability, (int, float)):
stability = max(stability, STABILITY_MIN)
else:
stability = stability.clamp(min=STABILITY_MIN)
return stability
def _initial_stability(self, *, rating: Rating) -> float:
initial_stability = self.parameters[rating - 1]
initial_stability = self._clamp_stability(stability=initial_stability)
return initial_stability
def _initial_difficulty(self, *, rating: Rating, clamp: bool) -> float:
initial_difficulty = (
self.parameters[4] - (math.e ** (self.parameters[5] * (rating - 1))) + 1
)
if clamp:
initial_difficulty = self._clamp_difficulty(difficulty=initial_difficulty)
return initial_difficulty
def _next_interval(self, *, stability: float) -> int:
next_interval = (stability / self._FACTOR) * (
(self.desired_retention ** (1 / self._DECAY)) - 1
)
if not isinstance(next_interval, (int, float)):
next_interval = next_interval.detach().item()
next_interval = round(next_interval) # intervals are full days
# must be at least 1 day long
next_interval = max(next_interval, 1)
# can not be longer than the maximum interval
next_interval = min(next_interval, self.maximum_interval)
return next_interval
def _short_term_stability(self, *, stability: float, rating: Rating) -> float:
short_term_stability_increase = (
math.e ** (self.parameters[17] * (rating - 3 + self.parameters[18]))
) * (stability ** -self.parameters[19])
if rating in (Rating.Good, Rating.Easy):
if isinstance(short_term_stability_increase, (int, float)):
short_term_stability_increase = max(short_term_stability_increase, 1.0)
else:
short_term_stability_increase = short_term_stability_increase.clamp(
min=1.0
)
short_term_stability = stability * short_term_stability_increase
short_term_stability = self._clamp_stability(stability=short_term_stability)
return short_term_stability
def _next_difficulty(self, *, difficulty: float, rating: Rating) -> float:
def _linear_damping(*, delta_difficulty: float, difficulty: float) -> float:
return (10.0 - difficulty) * delta_difficulty / 9.0
def _mean_reversion(*, arg_1: float, arg_2: float) -> float:
return self.parameters[7] * arg_1 + (1 - self.parameters[7]) * arg_2
arg_1 = self._initial_difficulty(rating=Rating.Easy, clamp=False)
delta_difficulty = -(self.parameters[6] * (rating - 3))
arg_2 = difficulty + _linear_damping(
delta_difficulty=delta_difficulty, difficulty=difficulty
)
next_difficulty = _mean_reversion(arg_1=arg_1, arg_2=arg_2)
next_difficulty = self._clamp_difficulty(difficulty=next_difficulty)
return next_difficulty
def _next_stability(
self,
*,
difficulty: float,
stability: float,
retrievability: float,
rating: Rating,
) -> float:
if rating == Rating.Again:
next_stability = self._next_forget_stability(
difficulty=difficulty,
stability=stability,
retrievability=retrievability,
)
elif rating in (Rating.Hard, Rating.Good, Rating.Easy):
next_stability = self._next_recall_stability(
difficulty=difficulty,
stability=stability,
retrievability=retrievability,
rating=rating,
)
else:
raise ValueError(f"Unknown rating: {rating}")
next_stability = self._clamp_stability(stability=next_stability)
return next_stability
def _next_forget_stability(
self, *, difficulty: float, stability: float, retrievability: float
) -> float:
next_forget_stability_long_term_params = (
self.parameters[11]
* (difficulty ** -self.parameters[12])
* (((stability + 1) ** (self.parameters[13])) - 1)
* (math.e ** ((1 - retrievability) * self.parameters[14]))
)
next_forget_stability_short_term_params = stability / (
math.e ** (self.parameters[17] * self.parameters[18])
)
return min(
next_forget_stability_long_term_params,
next_forget_stability_short_term_params,
)
def _next_recall_stability(
self,
*,
difficulty: float,
stability: float,
retrievability: float,
rating: Rating,
) -> float:
hard_penalty = self.parameters[15] if rating == Rating.Hard else 1
easy_bonus = self.parameters[16] if rating == Rating.Easy else 1
return stability * (
1
+ (math.e ** (self.parameters[8]))
* (11 - difficulty)
* (stability ** -self.parameters[9])
* ((math.e ** ((1 - retrievability) * self.parameters[10])) - 1)
* hard_penalty
* easy_bonus
)
def _get_fuzzed_interval(self, *, interval: timedelta) -> timedelta:
"""
Takes the current calculated interval and adds a small amount of random fuzz to it.
For example, a card that would've been due in 50 days, after fuzzing, might be due in 49, or 51 days.
Args:
interval: The calculated next interval, before fuzzing.
Returns:
timedelta: The new interval, after fuzzing.
"""
interval_days = interval.days
if interval_days < 2.5: # fuzz is not applied to intervals less than 2.5
return interval
def _get_fuzz_range(*, interval_days: int) -> tuple[int, int]:
"""
Helper function that computes the possible upper and lower bounds of the interval after fuzzing.
"""
delta = 1.0
for fuzz_range in FUZZ_RANGES:
delta += fuzz_range["factor"] * max(
min(float(interval_days), fuzz_range["end"]) - fuzz_range["start"],
0.0,
)
min_ivl = int(round(interval_days - delta))
max_ivl = int(round(interval_days + delta))
# make sure the min_ivl and max_ivl fall into a valid range
min_ivl = max(2, min_ivl)
max_ivl = min(max_ivl, self.maximum_interval)
min_ivl = min(min_ivl, max_ivl)
return min_ivl, max_ivl
min_ivl, max_ivl = _get_fuzz_range(interval_days=interval_days)
fuzzed_interval_days = (
random() * (max_ivl - min_ivl + 1)
) + min_ivl # the next interval is a random value between min_ivl and max_ivl
fuzzed_interval_days = min(round(fuzzed_interval_days), self.maximum_interval)
fuzzed_interval = timedelta(days=fuzzed_interval_days)
return fuzzed_interval
__all__ = ["Scheduler"]

View File

@@ -1,14 +0,0 @@
from enum import IntEnum
class State(IntEnum):
"""
Enum representing the learning state of a Card object.
"""
Learning = 1
Review = 2
Relearning = 3
__all__ = ["State"]