Core Surveillance 9 (CS9) is an R framework for building real-time disease surveillance systems. It provides infrastructure for:
CS9 is aimed at public health organizations, epidemiologists, and researchers who need to process surveillance data on a regular, automated schedule.
CS9 fits well when you need to:
| Object | Description |
|---|---|
| argset | A named list containing arguments passed between functions. |
| data_selector_fn | A function that extracts data for each plan. Takes argset and tables as arguments and returns a named list for use as data. |
| action_fn | The core function that performs analysis work. Takes data (from data_selector_fn), argset, and tables as arguments. |
| analysis | Individual computation within a plan. Combines 1 argset with 1 action_fn. |
| plan | Data processing unit containing 1 data-pull (using data_selector_fn) and multiple analyses. |
| task | The fundamental unit scheduled in CS9. Contains multiple plans and can run in parallel. This is what Airflow schedules. |
| schema | Database table definition with field types, validation rules, and access control. |
CS9 organizes surveillance work into a three-level hierarchy:
Tasks → Plans → Analyses
A typical CS9 surveillance system might include:
Figure 1 shows the available options within a task.
Data can be read from any source. Within a plan,
data_selector_fn extracts the data once (a
single data-pull). That data is then passed to each analysis, which
calls action_fn with:
The action_fn can then:
A single task would typically do only a subset of these things.
A plan-heavy task has many plans with few analyses each. An analysis-heavy task has few plans with many analyses each.
Data-pulls are relatively slow, so fewer plans means less total time waiting for data. If each data-pull extracts a larger dataset, the analyses can subset it as needed (via argsets). Analysis-heavy tasks are therefore generally faster, at the cost of higher RAM usage.
If RAM is limited, use more plans with smaller data-pulls instead.
Figure 1 shows only 2 location-based
analyses. In practice, Norway had 356 municipalities in 2021. If
figure 1 had 2 plans (one for 2021 data, one for
2020 data) with 356 analyses each (one per location_code),
that would be an analysis-heavy approach.
vignette("installation") covers database configuration and
environment variablesvignette("file-layout") explains how to organize your
implementation filesvignette("creating-a-task") walks through building your
first surveillance task step by stepThe following example walks through designing and implementing a weather surveillance task.
Start by creating a surveillance system. This object coordinates tables and tasks.
As documented in more detail here, we create a database table for weather data and add it to the surveillance system.
ss$add_table(
name_access = c("anon"),
name_grouping = "example_weather",
name_variant = NULL,
field_types = c(
"granularity_time" = "TEXT",
"granularity_geo" = "TEXT",
"country_iso3" = "TEXT",
"location_code" = "TEXT",
"border" = "INTEGER",
"age" = "TEXT",
"sex" = "TEXT",
"isoyear" = "INTEGER",
"isoweek" = "INTEGER",
"isoyearweek" = "TEXT",
"season" = "TEXT",
"seasonweek" = "DOUBLE",
"calyear" = "INTEGER",
"calmonth" = "INTEGER",
"calyearmonth" = "TEXT",
"date" = "DATE",
"tg" = "DOUBLE",
"tx" = "DOUBLE",
"tn" = "DOUBLE"
),
keys = c(
"granularity_time",
"location_code",
"date",
"age",
"sex"
),
validator_field_types = csdb::validator_field_types_csfmt_rts_data_v1,
validator_field_contents = csdb::validator_field_contents_csfmt_rts_data_v1
)Tasks are registered with ss$add_task:
# tm_run_task("example_weather_import_data_from_api")
ss$add_task(
name_grouping = "example_weather",
name_action = "import_data_from_api",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL, # "PACKAGE::TASK_NAME_plan_analysis"
for_each_plan = plnr::expand_list(
location_code = "county03" # fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
),
for_each_analysis = NULL,
universal_argset = NULL,
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_weather_import_data_from_api_action",
data_selector_fn_name = "example_weather_import_data_from_api_data_selector",
tables = list(
# input
# output
"anon_example_weather" = ss$tables$anon_example_weather
)
)Several parameters are worth explaining in more detail.
for_each_plan expects a list. Each element corresponds
to one plan; its values are added to the argset for every analysis
inside that plan.
The following code produces 4 plans with 1 analysis each, where each
analysis receives argset$var_1 and
argset$var_2:
for_each_plan <- list()
for_each_plan[[1]] <- list(
var_1 = 1,
var_2 = "a"
)
for_each_plan[[2]] <- list(
var_1 = 2,
var_2 = "b"
)
for_each_plan[[3]] <- list(
var_1 = 1,
var_2 = "a"
)
for_each_plan[[4]] <- list(
var_1 = 2,
var_2 = "b"
)Every task needs at least 1 plan. The simplest plan possible is:
plnr::expand_list works like expand.grid,
but returns a list instead of a data frame.
The four-element example above can be written more concisely:
for_each_plan <- plnr::expand_list(
var_1 = c(1,2),
var_2 = c("a", "b")
)
for_each_plan
#> [[1]]
#> [[1]]$var_1
#> [1] 1
#>
#> [[1]]$var_2
#> [1] "a"
#>
#>
#> [[2]]
#> [[2]]$var_1
#> [1] 2
#>
#> [[2]]$var_2
#> [1] "a"
#>
#>
#> [[3]]
#> [[3]]$var_1
#> [1] 1
#>
#> [[3]]$var_2
#> [1] "b"
#>
#>
#> [[4]]
#> [[4]]$var_1
#> [1] 2
#>
#> [[4]]$var_2
#> [1] "b"for_each_plan generates
length(for_each_plan) plans. for_each_analysis
works the same way, but generates analyses within each
plan.
A named list whose values are added to the argset of every analysis in the task.
When TRUE and tables contains a table named
output, the return value of action_fn is
upserted to tables$output at the end of each
plan.
When TRUE and action_fn returns a named
list, each element is upserted to the corresponding
tables$NAME_FROM_LIST at the end of each
plan.
If you upsert or insert manually from within action_fn,
you can only do so at the end of each analysis.
Same as upsert_at_end_of_each_plan, but inserts instead
of upserts.
If you insert manually from within action_fn, you can
only do so at the end of each analysis.
A character string naming the action function, preferably including the package name.
A character string naming the data selector function, preferably including the package name.
A named list containing the schemas used in this task.
The data_selector_fn extracts data for each plan.
The block inside if(plnr::is_run_directly()){ is for
interactive development. Running the function manually executes that
block, which loads argset and tables into your
environment so you can step through the rest of the code.
index_plan <- 1
argset <- ss$shortcut_get_argset("example_weather_import_data_from_api", index_plan = index_plan)
tables <- ss$shortcut_get_tables("example_weather_import_data_from_api")
print(argset)
#> $`**universal**`
#> [1] "*"
#>
#> $`**plan**`
#> [1] "*"
#>
#> $location_code
#> [1] "county03"
#>
#> $`**analysis**`
#> [1] "*"
#>
#> $`**automatic**`
#> [1] "*"
#>
#> $index
#> [1] 1
#>
#> $today
#> [1] "2025-08-21"
#>
#> $yesterday
#> [1] "2025-08-20"
#>
#> $index_plan
#> [1] 1
#>
#> $index_analysis
#> [1] 1
#>
#> $first_analysis
#> [1] TRUE
#>
#> $last_analysis
#> [1] TRUE
#>
#> $within_plan_first_analysis
#> [1] TRUE
#>
#> $within_plan_last_analysis
#> [1] TRUE
print(tables)
#> $anon_example_weather
#> public.anon_example_weather (disconnected)
#>
#> 1: granularity_time (TEXT) (KEY)
#> 2: granularity_geo (TEXT)
#> 3: country_iso3 (TEXT)
#> 4: location_code (TEXT) (KEY)
#> 5: border (INTEGER)
#> 6: age (TEXT) (KEY)
#> 7: sex (TEXT) (KEY)
#> 8: isoyear (INTEGER)
#> 9: isoweek (INTEGER)
#> 10: isoyearweek (TEXT)
#> 11: season (TEXT)
#> 12: seasonweek (DOUBLE)
#> 13: calyear (INTEGER)
#> 14: calmonth (INTEGER)
#> 15: calyearmonth (TEXT)
#> 16: date (DATE) (KEY)
#> 17: tg (DOUBLE)
#> 18: tx (DOUBLE)
#> 19: tn (DOUBLE)
#> 20: auto_last_updated_datetime (DATETIME)# **** data_selector **** ----
#' example_weather_import_data_from_api (data selector)
#' @param argset Argset
#' @param tables DB tables
#' @export
example_weather_import_data_from_api_data_selector = function(argset, tables){
if(plnr::is_run_directly()){
# global$ss$shortcut_get_plans_argsets_as_dt("example_weather_import_data_from_api")
index_plan <- 1
argset <- ss$shortcut_get_argset("example_weather_import_data_from_api", index_plan = index_plan)
tables <- ss$shortcut_get_tables("example_weather_import_data_from_api")
}
# find the mid lat/long for the specified location_code
gps <- fhimaps::norway_nuts3_map_b2020_default_dt[location_code == argset$location_code,.(
lat = mean(lat),
long = mean(long)
)]
# download the forecast for the specified location_code
d <- httr::GET(glue::glue("https://api.met.no/weatherapi/locationforecast/2.0/classic?lat={gps$lat}&lon={gps$long}"), httr::content_type_xml())
d <- xml2::read_xml(d$content)
# The variable returned must be a named list
retval <- list(
"data" = d
)
retval
}The if(plnr::is_run_directly()){ block works the same
way here: running the function manually loads data,
argset, and tables so you can work through the
analysis interactively.
index_plan <- 1
index_analysis <- 1
data <- ss$shortcut_get_data("example_weather_import_data_from_api", index_plan = index_plan)
argset <- ss$shortcut_get_argset("example_weather_import_data_from_api", index_plan = index_plan, index_analysis = index_analysis)
tables <- ss$shortcut_get_tables("example_weather_import_data_from_api")
print(data)
#> $data
#> {xml_document}
#> <weatherdata noNamespaceSchemaLocation="https://schema.api.met.no/schemas/weatherapi-0.4.xsd" created="2025-08-21T13:28:23Z" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
#> [1] <meta>\n <model name="met_public_forecast" termin="2025-08-21T13:00:00Z" ...
#> [2] <product class="pointData">\n <time datatype="forecast" from="2025-08-21 ...
#>
#> $hash
#> $hash$current
#> [1] "fa7c1fd90a6d9bf24040ae24919e6972"
#>
#> $hash$current_elements
#> $hash$current_elements$data
#> [1] "edf868d91cd1f5fe47b57b2aeb1d010d"
print(argset)
#> $`**universal**`
#> [1] "*"
#>
#> $`**plan**`
#> [1] "*"
#>
#> $location_code
#> [1] "county03"
#>
#> $`**analysis**`
#> [1] "*"
#>
#> $`**automatic**`
#> [1] "*"
#>
#> $index
#> [1] 1
#>
#> $today
#> [1] "2025-08-21"
#>
#> $yesterday
#> [1] "2025-08-20"
#>
#> $index_plan
#> [1] 1
#>
#> $index_analysis
#> [1] 1
#>
#> $first_analysis
#> [1] TRUE
#>
#> $last_analysis
#> [1] TRUE
#>
#> $within_plan_first_analysis
#> [1] TRUE
#>
#> $within_plan_last_analysis
#> [1] TRUE
print(tables)
#> $anon_example_weather
#> public.anon_example_weather (disconnected)
#>
#> 1: granularity_time (TEXT) (KEY)
#> 2: granularity_geo (TEXT)
#> 3: country_iso3 (TEXT)
#> 4: location_code (TEXT) (KEY)
#> 5: border (INTEGER)
#> 6: age (TEXT) (KEY)
#> 7: sex (TEXT) (KEY)
#> 8: isoyear (INTEGER)
#> 9: isoweek (INTEGER)
#> 10: isoyearweek (TEXT)
#> 11: season (TEXT)
#> 12: seasonweek (DOUBLE)
#> 13: calyear (INTEGER)
#> 14: calmonth (INTEGER)
#> 15: calyearmonth (TEXT)
#> 16: date (DATE) (KEY)
#> 17: tg (DOUBLE)
#> 18: tx (DOUBLE)
#> 19: tn (DOUBLE)
#> 20: auto_last_updated_datetime (DATETIME)# **** action **** ----
#' example_weather_import_data_from_api (action)
#' @param data Data
#' @param argset Argset
#' @param tables DB tables
#' @export
example_weather_import_data_from_api_action <- function(data, argset, tables) {
# tm_run_task("example_weather_import_data_from_api")
if(plnr::is_run_directly()){
# global$ss$shortcut_get_plans_argsets_as_dt("example_weather_import_data_from_api")
index_plan <- 1
index_analysis <- 1
data <- ss$shortcut_get_data("example_weather_import_data_from_api", index_plan = index_plan)
argset <- ss$shortcut_get_argset("example_weather_import_data_from_api", index_plan = index_plan, index_analysis = index_analysis)
tables <- ss$shortcut_get_tables("example_weather_import_data_from_api")
}
# code goes here
# special case that runs before everything
if(argset$first_analysis == TRUE){
}
a <- data$data
baz <- xml2::xml_find_all(a, ".//maxTemperature")
res <- vector("list", length = length(baz))
for (i in seq_along(baz)) {
parent <- xml2::xml_parent(baz[[i]])
grandparent <- xml2::xml_parent(parent)
time_from <- xml2::xml_attr(grandparent, "from")
time_to <- xml2::xml_attr(grandparent, "to")
x <- xml2::xml_find_all(parent, ".//minTemperature")
temp_min <- xml2::xml_attr(x, "value")
x <- xml2::xml_find_all(parent, ".//maxTemperature")
temp_max <- xml2::xml_attr(x, "value")
res[[i]] <- data.frame(
time_from = as.character(time_from),
time_to = as.character(time_to),
tx = as.numeric(temp_max),
tn = as.numeric(temp_min)
)
}
res <- rbindlist(res)
res <- res[stringr::str_sub(time_from, 12, 13) %in% c("00", "06", "12", "18")]
res[, date := as.Date(stringr::str_sub(time_from, 1, 10))]
res[, N := .N, by = date]
res <- res[N == 4]
res <- res[
,
.(
tg = NA,
tx = max(tx),
tn = min(tn)
),
keyby = .(date)
]
# we look at the downloaded data
print("Data after downloading")
print(res)
# we now need to format it
res[, granularity_time := "day"]
res[, sex := "total"]
res[, age := "total"]
res[, location_code := argset$location_code]
res[, border := 2020]
# fill in missing structural variables
cstidy::set_csfmt_rts_data_v1(res)
# we look at the downloaded data
print("Data after missing structural variables filled in")
print(res)
# put data in db table
# tables$TABLE_NAME$insert_data(d)
tables$anon_example_weather$upsert_data(res)
# tables$TABLE_NAME$drop_all_rows_and_then_upsert_data(d)
# special case that runs after everything
# copy to anon_web?
if(argset$last_analysis == TRUE){
# cs9::copy_into_new_table_where(
# table_from = "anon_X",
# table_to = "anon_web_X"
# )
}
}ss$run_task("example_weather_import_data_from_api")
#> task: example_weather_import_data_from_api
#> Running task=example_weather_import_data_from_api with plans=1 and analyses=1
#> plans=sequential, argset=sequential with cores=1
#>
#> [1] "Data after downloading"
#> Key: <date>
#> date tg tx tn
#> <Date> <lgcl> <num> <num>
#> 1: 2025-08-22 NA 16.0 8.8
#> 2: 2025-08-23 NA 17.6 8.0
#> 3: 2025-08-24 NA 18.4 9.8
#> 4: 2025-08-25 NA 18.1 8.8
#> 5: 2025-08-26 NA 18.8 8.9
#> 6: 2025-08-27 NA 19.7 9.2
#> 7: 2025-08-28 NA 20.4 10.2
#> 8: 2025-08-29 NA 19.0 10.1
#> 9: 2025-08-30 NA 16.9 12.1
#> [1] "Data after missing structural variables filled in"
#> granularity_time granularity_geo country_iso3 location_code border age
#> <char> <char> <char> <char> <int> <char>
#> 1: day county nor county03 2020 total
#> 2: day county nor county03 2020 total
#> 3: day county nor county03 2020 total
#> 4: day county nor county03 2020 total
#> 5: day county nor county03 2020 total
#> 6: day county nor county03 2020 total
#> 7: day county nor county03 2020 total
#> 8: day county nor county03 2020 total
#> 9: day county nor county03 2020 total
#> sex isoyear isoweek isoyearweek season seasonweek calyear calmonth
#> <char> <int> <int> <char> <char> <num> <int> <int>
#> 1: total NA NA <NA> <NA> NA NA NA
#> 2: total NA NA <NA> <NA> NA NA NA
#> 3: total NA NA <NA> <NA> NA NA NA
#> 4: total NA NA <NA> <NA> NA NA NA
#> 5: total NA NA <NA> <NA> NA NA NA
#> 6: total NA NA <NA> <NA> NA NA NA
#> 7: total NA NA <NA> <NA> NA NA NA
#> 8: total NA NA <NA> <NA> NA NA NA
#> 9: total NA NA <NA> <NA> NA NA NA
#> calyearmonth date tg tx tn
#> <char> <Date> <lgcl> <num> <num>
#> 1: <NA> 2025-08-22 NA 16.0 8.8
#> 2: <NA> 2025-08-23 NA 17.6 8.0
#> 3: <NA> 2025-08-24 NA 18.4 9.8
#> 4: <NA> 2025-08-25 NA 18.1 8.8
#> 5: <NA> 2025-08-26 NA 18.8 8.9
#> 6: <NA> 2025-08-27 NA 19.7 9.2
#> 7: <NA> 2025-08-28 NA 20.4 10.2
#> 8: <NA> 2025-08-29 NA 19.0 10.1
#> 9: <NA> 2025-08-30 NA 16.9 12.1
#> Error in super$upsert_data(newdata, drop_indexes, verbose): upsert_load_data_infile not validated in anon_example_weather. granularity_timess$add_task(
name_grouping = "example",
name_action = "import_data",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL,
for_each_plan = plnr::expand_list(
x = 1
),
for_each_analysis = NULL,
universal_argset = list(
folder = cs9::path("input", "example")
),
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_import_data_action",
data_selector_fn_name = "example_import_data_data_selector",
tables = list(
# input
# output
"output" = ss$tables$output
)
)ss$add_task(
name_grouping = "example",
name_action = "analysis",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL,
for_each_plan = plnr::expand_list(
location_code = csdata::nor_locations_names()[granularity_geo %in% c("county")]$location_code
),
for_each_analysis = NULL,
universal_argset = NULL,
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_analysis_action",
data_selector_fn_name = "example_analysis_data_selector",
tables = list(
# input
"input" = ss$tables$input,
# output
"output" = ss$tables
)
)ss$add_task(
name_grouping = "example",
name_action = "export_results",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL,
for_each_plan = plnr::expand_list(
location_code = csdata::nor_locations_names()[granularity_geo %in% c("county")]$location_code
),
for_each_analysis = NULL,
universal_argset = list(
folder = cs9::path("output", "example")
),
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_export_results_action",
data_selector_fn_name = "example_export_results_data_selector",
tables = list(
# input
"input" = ss$tables$input
# output
)
)ss$tables(
name_grouping = "example",
name_action = "export_results",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL,
for_each_plan = plnr::expand_list(
x = 1
),
for_each_analysis = NULL,
universal_argset = list(
folder = cs9::path("output", "example"),
granularity_geos = c("nation", "county")
),
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_export_results_action",
data_selector_fn_name = "example_export_results_data_selector",
tables = list(
# input
"input" = ss$tables$input
# output
)
)A typical development-to-production cycle looks like this:
devtools::load_all()
for interactive developmentglobal$ss$run_task("task_name")The recommended starting point is the cs9example repository, which provides a working surveillance system you can adapt:
"cs9example" with your
package name03_tables.R for your
data04_tasks.R for your
analyses