--- title: "Introduction to CS9" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Introduction to CS9} %\VignetteEncoding{UTF-8} %\VignetteEngine{knitr::rmarkdown} editor_options: chunk_output_type: console --- ## What is CS9? Core Surveillance 9 (CS9) is an R framework for building real-time disease surveillance systems. It provides infrastructure for: - Automated data pipelines for surveillance data - Database-driven storage and retrieval - Parallel processing and workflow orchestration - Built-in support for common surveillance concepts - Docker-ready deployment for operational systems CS9 is aimed at public health organizations, epidemiologists, and researchers who need to process surveillance data on a regular, automated schedule. ## When to use CS9 CS9 fits well when you need to: - Process surveillance data on a regular schedule (daily, weekly) - Maintain historical data with a proper database backend - Run analysis pipelines where tasks depend on each other - Deploy surveillance systems in a production environment - Handle multiple data sources and analysis outputs - Apply data validation and quality control systematically ## Key concepts and definitions
Object Description
argset A named list containing arguments passed between functions.
data_selector_fn A function that extracts data for each plan. Takes argset and tables as arguments and returns a named list for use as data.
action_fn The core function that performs analysis work. Takes data (from data_selector_fn), argset, and tables as arguments.
analysis Individual computation within a plan. Combines 1 argset with 1 action_fn.
plan Data processing unit containing 1 data-pull (using data_selector_fn) and multiple analyses.
task The fundamental unit scheduled in CS9. Contains multiple plans and can run in parallel. This is what Airflow schedules.
schema Database table definition with field types, validation rules, and access control.
## Surveillance system architecture CS9 organizes surveillance work into a three-level hierarchy: Tasks → Plans → Analyses - **Tasks** are the unit of scheduled work — things like "download weather data", "calculate disease trends", or "generate reports". - **Plans** are data processing units within a task, typically organized by time period, geographic area, or data subset. - **Analyses** are the individual computations within a plan — the actual work performed on the extracted data. ### Example workflow A typical CS9 surveillance system might include: 1. **Data import**: Download data from external APIs or files 2. **Data cleaning**: Validate, standardize, and quality-check incoming data 3. **Analysis**: Calculate surveillance indicators, trends, or alerts 4. **Reporting**: Generate outputs such as Excel files, plots, or automated reports 5. **Delivery**: Send results by email, upload to a website, or trigger alerts
Figure 1. A general task showing the many options of a task.
Figure 1 shows the available options within a task. Data can be read from any source. Within a plan, `data_selector_fn` extracts the data **once** (a single data-pull). That data is then passed to each analysis, which calls `action_fn` with: - the extracted data - the argset - the tables The `action_fn` can then: - Write data or results to database tables - Send emails - Export graphs, Excel files, reports, or other files A single task would typically do only a subset of these things. ### Plan-heavy or analysis-heavy tasks? A plan-heavy task has many plans with few analyses each. An analysis-heavy task has few plans with many analyses each. Data-pulls are relatively slow, so fewer plans means less total time waiting for data. If each data-pull extracts a larger dataset, the analyses can subset it as needed (via argsets). Analysis-heavy tasks are therefore generally faster, at the cost of higher RAM usage. If RAM is limited, use more plans with smaller data-pulls instead. Figure 1 shows only 2 location-based analyses. In practice, Norway had 356 municipalities in 2021. If figure 1 had 2 plans (one for 2021 data, one for 2020 data) with 356 analyses each (one per `location_code`), that would be an analysis-heavy approach. ## Other vignettes - **Installation and setup**: `vignette("installation")` covers database configuration and environment variables - **Package structure**: `vignette("file-layout")` explains how to organize your implementation files - **Creating tasks**: `vignette("creating-a-task")` walks through building your first surveillance task step by step ## Example The following example walks through designing and implementing a weather surveillance task. ### Surveillance system setup Start by creating a surveillance system. This object coordinates tables and tasks. ``` r ss <- cs9::SurveillanceSystem_v9$new() ``` ### Database table definition As documented in more detail [here](https://niphr.github.io/csdb/reference/DBTable_v9.html), we create a database table for weather data and add it to the surveillance system. ``` r ss$add_table( name_access = c("anon"), name_grouping = "example_weather", name_variant = NULL, field_types = c( "granularity_time" = "TEXT", "granularity_geo" = "TEXT", "country_iso3" = "TEXT", "location_code" = "TEXT", "border" = "INTEGER", "age" = "TEXT", "sex" = "TEXT", "isoyear" = "INTEGER", "isoweek" = "INTEGER", "isoyearweek" = "TEXT", "season" = "TEXT", "seasonweek" = "DOUBLE", "calyear" = "INTEGER", "calmonth" = "INTEGER", "calyearmonth" = "TEXT", "date" = "DATE", "tg" = "DOUBLE", "tx" = "DOUBLE", "tn" = "DOUBLE" ), keys = c( "granularity_time", "location_code", "date", "age", "sex" ), validator_field_types = csdb::validator_field_types_csfmt_rts_data_v1, validator_field_contents = csdb::validator_field_contents_csfmt_rts_data_v1 ) ``` ### Task configuration Tasks are registered with `ss$add_task`: ``` r # tm_run_task("example_weather_import_data_from_api") ss$add_task( name_grouping = "example_weather", name_action = "import_data_from_api", name_variant = NULL, cores = 1, plan_analysis_fn_name = NULL, # "PACKAGE::TASK_NAME_plan_analysis" for_each_plan = plnr::expand_list( location_code = "county03" # fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code ), for_each_analysis = NULL, universal_argset = NULL, upsert_at_end_of_each_plan = FALSE, insert_at_end_of_each_plan = FALSE, action_fn_name = "example_weather_import_data_from_api_action", data_selector_fn_name = "example_weather_import_data_from_api_data_selector", tables = list( # input # output "anon_example_weather" = ss$tables$anon_example_weather ) ) ``` Several parameters are worth explaining in more detail. #### for_each_plan `for_each_plan` expects a list. Each element corresponds to one plan; its values are added to the argset for every analysis inside that plan. The following code produces 4 plans with 1 analysis each, where each analysis receives `argset$var_1` and `argset$var_2`: ``` r for_each_plan <- list() for_each_plan[[1]] <- list( var_1 = 1, var_2 = "a" ) for_each_plan[[2]] <- list( var_1 = 2, var_2 = "b" ) for_each_plan[[3]] <- list( var_1 = 1, var_2 = "a" ) for_each_plan[[4]] <- list( var_1 = 2, var_2 = "b" ) ``` Every task needs at least 1 plan. The simplest plan possible is: ``` r plnr::expand_list( x = 1 ) #> [[1]] #> [[1]]$x #> [1] 1 ``` #### plnr::expand_list `plnr::expand_list` works like `expand.grid`, but returns a list instead of a data frame. The four-element example above can be written more concisely: ``` r for_each_plan <- plnr::expand_list( var_1 = c(1,2), var_2 = c("a", "b") ) for_each_plan #> [[1]] #> [[1]]$var_1 #> [1] 1 #> #> [[1]]$var_2 #> [1] "a" #> #> #> [[2]] #> [[2]]$var_1 #> [1] 2 #> #> [[2]]$var_2 #> [1] "a" #> #> #> [[3]] #> [[3]]$var_1 #> [1] 1 #> #> [[3]]$var_2 #> [1] "b" #> #> #> [[4]] #> [[4]]$var_1 #> [1] 2 #> #> [[4]]$var_2 #> [1] "b" ``` #### for_each_analysis `for_each_plan` generates `length(for_each_plan)` plans. `for_each_analysis` works the same way, but generates **analyses** within each plan. #### universal_argset A named list whose values are added to the argset of every analysis in the task. #### upsert_at_end_of_each_plan When `TRUE` and `tables` contains a table named `output`, the return value of `action_fn` is upserted to `tables$output` at the end of each **plan**. When `TRUE` and `action_fn` returns a named list, each element is upserted to the corresponding `tables$NAME_FROM_LIST` at the end of each **plan**. If you upsert or insert manually from within `action_fn`, you can only do so at the end of each **analysis**. #### insert_at_end_of_each_plan Same as `upsert_at_end_of_each_plan`, but inserts instead of upserts. If you insert manually from within `action_fn`, you can only do so at the end of each **analysis**. #### action_fn_name A character string naming the action function, preferably including the package name. #### data_selector_fn_name A character string naming the data selector function, preferably including the package name. #### schema A named list containing the schemas used in this task. ### data_selector_fn The `data_selector_fn` extracts data for each plan. The block inside `if(plnr::is_run_directly()){` is for interactive development. Running the function manually executes that block, which loads `argset` and `tables` into your environment so you can step through the rest of the code. ``` r index_plan <- 1 argset <- ss$shortcut_get_argset("example_weather_import_data_from_api", index_plan = index_plan) tables <- ss$shortcut_get_tables("example_weather_import_data_from_api") print(argset) #> $`**universal**` #> [1] "*" #> #> $`**plan**` #> [1] "*" #> #> $location_code #> [1] "county03" #> #> $`**analysis**` #> [1] "*" #> #> $`**automatic**` #> [1] "*" #> #> $index #> [1] 1 #> #> $today #> [1] "2025-08-21" #> #> $yesterday #> [1] "2025-08-20" #> #> $index_plan #> [1] 1 #> #> $index_analysis #> [1] 1 #> #> $first_analysis #> [1] TRUE #> #> $last_analysis #> [1] TRUE #> #> $within_plan_first_analysis #> [1] TRUE #> #> $within_plan_last_analysis #> [1] TRUE print(tables) #> $anon_example_weather #> public.anon_example_weather (disconnected) #> #> 1: granularity_time (TEXT) (KEY) #> 2: granularity_geo (TEXT) #> 3: country_iso3 (TEXT) #> 4: location_code (TEXT) (KEY) #> 5: border (INTEGER) #> 6: age (TEXT) (KEY) #> 7: sex (TEXT) (KEY) #> 8: isoyear (INTEGER) #> 9: isoweek (INTEGER) #> 10: isoyearweek (TEXT) #> 11: season (TEXT) #> 12: seasonweek (DOUBLE) #> 13: calyear (INTEGER) #> 14: calmonth (INTEGER) #> 15: calyearmonth (TEXT) #> 16: date (DATE) (KEY) #> 17: tg (DOUBLE) #> 18: tx (DOUBLE) #> 19: tn (DOUBLE) #> 20: auto_last_updated_datetime (DATETIME) ``` ``` r # **** data_selector **** ---- #' example_weather_import_data_from_api (data selector) #' @param argset Argset #' @param tables DB tables #' @export example_weather_import_data_from_api_data_selector = function(argset, tables){ if(plnr::is_run_directly()){ # global$ss$shortcut_get_plans_argsets_as_dt("example_weather_import_data_from_api") index_plan <- 1 argset <- ss$shortcut_get_argset("example_weather_import_data_from_api", index_plan = index_plan) tables <- ss$shortcut_get_tables("example_weather_import_data_from_api") } # find the mid lat/long for the specified location_code gps <- fhimaps::norway_nuts3_map_b2020_default_dt[location_code == argset$location_code,.( lat = mean(lat), long = mean(long) )] # download the forecast for the specified location_code d <- httr::GET(glue::glue("https://api.met.no/weatherapi/locationforecast/2.0/classic?lat={gps$lat}&lon={gps$long}"), httr::content_type_xml()) d <- xml2::read_xml(d$content) # The variable returned must be a named list retval <- list( "data" = d ) retval } ``` ### action_fn The `if(plnr::is_run_directly()){` block works the same way here: running the function manually loads `data`, `argset`, and `tables` so you can work through the analysis interactively. ``` r index_plan <- 1 index_analysis <- 1 data <- ss$shortcut_get_data("example_weather_import_data_from_api", index_plan = index_plan) argset <- ss$shortcut_get_argset("example_weather_import_data_from_api", index_plan = index_plan, index_analysis = index_analysis) tables <- ss$shortcut_get_tables("example_weather_import_data_from_api") print(data) #> $data #> {xml_document} #> #> [1] \n [2] \n