Schedule Combinators

On this page

Schedules define stateful, possibly effectful, recurring schedules of events, and compose in a variety of ways. Combinators allow us to take schedules and combine them together to get other schedules.

To demonstrate the functionality of different combinators, we will be working with the following helper:

Delay.ts
ts
import { Effect } from "effect"
 
let start = new Date().getTime()
 
export const log = Effect.sync(() => {
const now = new Date().getTime()
console.log(`delay: ${now - start}`)
start = now
})
Delay.ts
ts
import { Effect } from "effect"
 
let start = new Date().getTime()
 
export const log = Effect.sync(() => {
const now = new Date().getTime()
console.log(`delay: ${now - start}`)
start = now
})

The logDelay helper logs the time delay between each execution. We will use this effect to showcase the behavior of various built-in combinators.

Composition

Schedules compose in the following primary ways:

  • Union. This performs the union of the intervals of two schedules.
  • Intersection. This performs the intersection of the intervals of two schedules.
  • Sequencing. This concatenates the intervals of one schedule onto another.

Union

Combines two schedules through union, by recurring if either schedule wants to recur, using the minimum of the two delays between recurrences.

ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.union(
Schedule.exponential("100 millis"),
Schedule.spaced("1 seconds")
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 3
delay: 115 < exponential
delay: 202
delay: 404
delay: 802
delay: 1002 < spaced
delay: 1006
delay: 1006
delay: 1006
delay: 1006
...
*/
ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.union(
Schedule.exponential("100 millis"),
Schedule.spaced("1 seconds")
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 3
delay: 115 < exponential
delay: 202
delay: 404
delay: 802
delay: 1002 < spaced
delay: 1006
delay: 1006
delay: 1006
delay: 1006
...
*/

When we use the combined schedule with Effect.repeat, we observe that the effect is executed repeatedly based on the minimum delay between the two schedules. In this case, the delay alternates between the exponential schedule (increasing delay) and the spaced schedule (constant delay).

Intersection

Combines two schedules through the intersection, by recurring only if both schedules want to recur, using the maximum of the two delays between recurrences.

ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.intersect(
Schedule.exponential("10 millis"),
Schedule.recurs(5)
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 4
delay: 18 < recurs
delay: 22
delay: 45
delay: 84
delay: 166
*/
ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.intersect(
Schedule.exponential("10 millis"),
Schedule.recurs(5)
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 4
delay: 18 < recurs
delay: 22
delay: 45
delay: 84
delay: 166
*/

When we use the combined schedule with Effect.repeat, we observe that the effect is executed repeatedly only if both schedules want it to recur. The delay between recurrences is determined by the maximum delay between the two schedules. In this case, the delay follows the progression of the exponential schedule until the maximum number of recurrences specified by the recursive schedule is reached.

Sequencing

Combines two schedules sequentially, by following the first policy until it ends, and then following the second policy.

ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.andThen(
Schedule.recurs(5),
Schedule.spaced("1 seconds")
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 4
delay: 8 < recurs
delay: 4
delay: 2
delay: 0
delay: 2
delay: 1005 < spaced
delay: 1007
delay: 1006
delay: 1007
delay: 1005
...
*/
ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.andThen(
Schedule.recurs(5),
Schedule.spaced("1 seconds")
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 4
delay: 8 < recurs
delay: 4
delay: 2
delay: 0
delay: 2
delay: 1005 < spaced
delay: 1007
delay: 1006
delay: 1007
delay: 1005
...
*/

When we use the combined schedule with Effect.repeat, we observe that the effect follows the policy of the first schedule (recurs) until it completes the specified number of recurrences. After that, it switches to the policy of the second schedule (spaced) and continues repeating the effect with the fixed delay between recurrences.

Jittering

A jittered is a combinator that takes one schedule and returns another schedule of the same type except for the delay which is applied randomly

When a resource is out of service due to overload or contention, retrying and backing off doesn't help us. If all failed API calls are backed off to the same point of time, they cause another overload or contention. Jitter adds some amount of randomness to the delay of the schedule. This helps us to avoid ending up accidentally synchronizing and taking the service down by accident.

Research shows that Schedule.jittered(0.0, 1.0) is very suitable for retrying.

ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.jittered(Schedule.exponential("10 millis"))
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 3
delay: 18
delay: 24
delay: 48
delay: 92
delay: 184
delay: 351
delay: 620
delay: 1129
delay: 2576
...
*/
ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.jittered(Schedule.exponential("10 millis"))
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 3
delay: 18
delay: 24
delay: 48
delay: 92
delay: 184
delay: 351
delay: 620
delay: 1129
delay: 2576
...
*/

In this example, we use the jittered combinator to apply jitter to an exponential schedule. The exponential schedule increases the delay between each repetition exponentially. By adding jitter to the schedule, the delays become randomly adjusted within a certain range.

Filtering

We can filter inputs or outputs of a schedule with whileInput and whileOutput.

ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.whileOutput(Schedule.recurs(5), (n) => n <= 2)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 2
delay: 11 < recurs
delay: 1
delay: 1
(end) < whileOutput
*/
ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.whileOutput(Schedule.recurs(5), (n) => n <= 2)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 2
delay: 11 < recurs
delay: 1
delay: 1
(end) < whileOutput
*/

In this example, we create a schedule using Schedule.recurs(5) to repeat a certain action up to 5 times. However, we apply the whileOutput combinator with a predicate that filters out outputs greater than 2. As a result, the schedule stops producing outputs once the value exceeds 2, and the repetition ends.

Modifying

Modifies the delay of a schedule.

ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.modifyDelay(
Schedule.spaced("1 seconds"),
(_) => "100 millis"
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 4
delay: 110 < modifyDelay
delay: 103
delay: 103
delay: 106
...
*/
ts
import { Effect, Schedule } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.modifyDelay(
Schedule.spaced("1 seconds"),
(_) => "100 millis"
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 4
delay: 110 < modifyDelay
delay: 103
delay: 103
delay: 106
...
*/

Tapping

Whenever we need to effectfully process each schedule input/output, we can use tapInput and tapOutput.

We can use these two functions for logging purposes:

ts
import { Effect, Schedule, Console } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.tapOutput(Schedule.recurs(2), (n) =>
Console.log(`repeating ${n}`)
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 2
repeating 0
delay: 8
repeating 1
delay: 5
repeating 2
*/
ts
import { Effect, Schedule, Console } from "effect"
import * as Delay from "./Delay"
 
const schedule = Schedule.tapOutput(Schedule.recurs(2), (n) =>
Console.log(`repeating ${n}`)
)
 
Effect.runPromise(Effect.repeat(Delay.log, schedule))
/*
Output:
delay: 2
repeating 0
delay: 8
repeating 1
delay: 5
repeating 2
*/