Building a Data Ingress Business Rule
Introduction
Any standard SensibleAI Forecast implementation includes building a data pipeline in its scope. The Data Ingress Data Management job’s purpose is to extract actuals, features, and mapping tables from the external source database, transform the actuals and feature datasets into SensibleAI Forecast formats, and load them to the AI Services Data Sources Database within their environment. Due to this, many times, Data Ingress Business Rules can be constructed from using a template or simple code-snippets and making small modifications to meet the stakeholder’s requirements.
This is a pre- Xperiflow Business Rules document. There may be helper functions in XBR that can speed up development or replace the need for workspaces and packages referenced in this document.
This is a pre- SensibleAI Studio document. There may be routines in AI Studio that can replace code called out in this document.
Common Operating Scope or Framework
At its core, most Ingress rules will look like some combination and ordering of the following:
-
Load in all required tables from AI Services Data Sources Database to
DataTable
objects -
Load in all required tables from the Customer Database to
DataTable
objects -
Create Actuals Data
- Join with mapping tables to incorporate additional target metadata.
- Transform Date column(s) to a DateTime type
- Clean Target Dimension columns to a specific format
- Filter data for entries between a date range
- Filter data for entries belonging to a target set
- Aggregate to a higher target dimensionality
- Resample to a different frequency
- Remove shutdown targets
- Flag cold start targets
-
Create Feature Data
- Join with mapping tables for target mapping
- Transform Date column(s) to a DateTime type
- Clean Feature Description column to a specific format
- Filter data for entries between a date range
- Resample to a different frequency
-
Save Actuals and Feature DataTables to the AI Services Data Sources Database
Creating a Business Rule
To create a business rule, navigate the Business Rules page under Application > Tools > Business Rules in the OneStream environment. Then, select “Create a Business Rule”
On the window that pops up, set the Type to “Extender”, name the business rule, and select the language as C#. A recommended naming convention is “SML_[UseCase]_DataIngress
”, where the use case is a short description of the purpose of SensibleAI Forecast Forecasting. This could be demand, a specific division or region in the company, or a P&L, for example.
After creating, the business rule will be found in the Extensibility Rules folder on the Business Rules page. Select the business rule to edit it.
There are two sections in the business rule: Properties, and Formula. The Formula page contains all the code for the business rule. The default extender business rule contains a series of using statements and a namespace that contains a MainClass class with a Main object. This Main object is what executes when the business rule runs. As this document later covers, it is recommended to use the Main object to execute a series of functions that the designer creates. These functions should be public static methods that exist within the MainClass class scope.
The Properties page is used to modify security access, which may or may not be changed, depending on the customer. Importantly, the “Referenced Assemblies” field needs to be modified to include any external business rules, assemblies, or workspaces that the business rule will reference. When Xperiflow Business Rules become standard, the assembly will need to be incorporated. In this document, the AISXDT_SolutionHelper business rule is referenced, as it has built-in functions for loading to DataTables and writing to AI Services Data Sources. The “BR\” indicates that this is a business rule, and the “AISXDT_SolutionHelper” is its name.
BR\AISXDT_SolutionHelper
Any referenced assemblies can be accessed in the business rule body in a similar syntax to this:
// Create a reference object for the referenced business rule's MainClass
var xdtSolutionHelper = new DashboardExtender.AISXDT_SolutionHelper.MainClass();
// Execute any method inside that business rule
xdtSolutionHelper.GetDatabaseConnection();
Beginning Preparation
The recommended first step for a data ingress sequence is to fully design it as a flow diagram. This diagram should track every individual step or transform that each table undergoes. It’s quite likely that the entire process has been coded in SQL up until this point, throughout the natural course of RPE. Convert each line in the SQL code to a step in a flow diagram. What columns are being added / removed with each SELECT or JOIN statement? What calculations are performed? What filters are applied with each WHERE clause?
This flow diagram is useful for another reason: it becomes a supplementary deliverable to the customer. The will the customer be able to confirm and agree with the process before any coding and troubleshooting occurs.
After confirming with the customer that the Data Ingress sequence is appropriate, then it recommended to create comments or sections in the Business Rule Main object to outline the function calls required. This could look something like this:
// using statements up here
namespace OneStream.BusinessRule.Extender.EXAMPLE_INGRESS
{
public class MainClass
{
public object Main(SessionInfo si, BRGlobals globals, object api, ExtenderArgs args)
{
try
{
// Step 1: Define or load all constants
// Step 2: Load in all tables from AIS Data Sources
// Step 3: Load in all tables from Customer Database
// Step 4: Create Actuals Dataset
// Step 5: Create Feature Dataset
// Step 6: Push output tables to AIS Data Sources
return null;
}
catch(Exception ex)
{
throw ErrorHandler.LogWrite(si, new XFException(si, ex));
}
}
}
}
The last recommendation is to functionalize all code aspects. Not only does this help with debugging later on, but it makes the code significantly easier to read. A significant portion of the Main object code should be function calls to functions defined in other workspaces or within the same MainClass of the business rule.
Common Code Snippets
For each step of an ingress statement, here are generic and short code snippets that can be functionalized.
DataTable Import - AI Services Data Sources
// Purpose: Create a DataTable object from a SQL table in AI Services Data Sources
// Parameters: SessionInfo si - Current SessionInfo object for logging purposes
// string sourceTableName - Exact name of table in database to pull from
// Return Values: Returns the DataTable object of the table.
// Exceptions: ArgumentNullException - Input table name is null.
// InvalidOperationException - General unable to complete, or table is null or empty after pulling in.
public static DataTable PullAISDSTable(SessionInfo si, string sourceTableName)
{
try
{
if(sourceTableName == null)
{
throw new ArgumentNullException("sourceTableName", "Cannot PullAISDSTable() due to null table name.");
}
var xdtSolutionHelper = new DashboardExtender.AISXDT_SolutionHelper.MainClass();
DataTable dt = xdtSolutionHelper.LoadDatabaseTable(si, "AI Services Data Sources", sourceTableName, -1);
dt.TableName = sourceTableName;
if(IsDataTablePopulated(dt))
{
return dt;
}
else
{
throw new InvalidOperationException($ "{sourceTableName} is empty or null after PullAISDSTable().");
}
}
catch(ArgumentNullException)
{
throw;
}
catch(InvalidOperationException)
{
throw;
}
catch(Exception)
{
throw new InvalidOperationException($ "Unable to PullAISDSTable() - {sourceTableName}.");
}
}
DataTable Import - Using SQL from a customer database using an SIC connection
// Purpose: Create a DataTable object from a SQL table in Customer Database
// Parameters: SessionInfo si - Current SessionInfo object for logging purposes
// string dbConnectionName - Database name from which to pull the table from
// string tableName - Table Name in Customer Database
// Return Values: Returns the DataTable object of the table.
// Exceptions: ArgumentNullException - Input table name is null.
// InvalidOperationException - General unable to complete, error with type
// conversions, or table is null or empty after loading in.
public static DataTable PullDataTableFromSICConnection(SessionInfo si, string dbConnectionName, string tableName)
{
try
{
if(tableName == null)
{
throw new ArgumentNullException("tableName", "Cannot PullDataTableFromSICConnection() due to null table name.");
}
// Build out the table schema
DataTable dt = new DataTable();
dt.Columns.Add("Date", typeof(DateTime));
dt.Columns.Add("TargetDim1", typeof(string));
dt.Columns.Add("TargetDim2", typeof(string));
dt.Columns.Add("TargetDim3", typeof(string));
dt.Columns.Add("Location", typeof(string));
dt.Columns.Add("Value", typeof(decimal));
dt.TableName = tableName;
// Build a SQL query for a simple extraction from the database
StringBuilder sqlCall = new StringBuilder();
sqlCall.Append("SELECT Date_Source, TargetDim1_Source, TargetDim2_Source, TargetDim3_Source, Location_Source, Value_Source ");
sqlCall.Append($ "FROM {tableName} ");
// Add any required filtering or aggregation clauses (not recommended, use only if DataTable size is a concern)
sqlCall.Append($ "WHERE Location = 'ExampleLocation' ");
// Connect to the database using the SIC Connection
using(var dbConnection = DatabaseCommon.GetDatabaseConnection(si, dbConnectionName))
{
// Execute the SQL query and write from the temporary table to the output table.
DataTable tempDT = BRApi.Database.ExecuteSql(BRApi.Database.CreateExternalDbConnInfo(si, dbConnectionName), sqlCall.ToString(), false);
foreach(DataRow row in tempDT.Rows)
{
try
{
DataRow newRow = dt.NewRow();
newRow["Date"] = DateTime.Parse(row["Date_Source"]);
newRow["TargetDim1"] = Convert.ToString(row["TargetDim1_Source"]);
newRow["TargetDim2"] = Convert.ToString(row["TargetDim2_Source"]);
newRow["TargetDim3"] = Convert.ToString(row["TargetDim3_Source"]);
newRow["Location"] = Convert.ToString(row["Location_Source"]);
newRow["Value"] = Convert.ToDecimal(row["Value_Source"]);
}
catch
{
throw new InvalidOperationException($ "Error in type conversion from table {tableName} in PullDataTableFromSICConnection().");
}
dt.Rows.Add(newRow);
}
}
if(IsDataTablePopulated(dt))
{
return dt;
}
else
{
throw new InvalidOperationException($ "{tableName} is empty or null after PullDataTableFromSICConnection().");
}
}
catch(ArgumentNullException)
{
throw;
}
catch(InvalidOperationException)
{
throw;
}
catch(Exception)
{
throw new InvalidOperationException($ "Unable to PullDataTableFromSICConnection() for {tableName} at {dbConnectionName}.");
}
}
DataTable Validation - Is the table populated?
// Purpose: Logical test on if a datatable object is populated (rows exist and not null)
// Parameters: DataTable dt - datatable object to test on
// Return Values: Boolean True (if datatable is populated) or False (if datatable has no rows or is null)
// Exceptions: InvalidOperationException - General unable to complete
public static bool IsDataTablePopulated(DataTable dt)
{
try
{
if((dt.Rows.Count == 0) || (dt == null))
{
return false;
}
else
{
return true;
}
}
catch(Exception)
{
throw new InvalidOperationException("Unable to check IsDataTablePopulated().");
}
}
DataTable Validation - Does the table include all listed columns?
// Purpose: Logical test on if a datatable object contains all the columns specified in the list
// Parameters: DataTable dt - datatable object to test on
// List<string> columnNames - list of column names to check
// Return Values: Boolean true if table contains all columns. Modifies out string columnName to reflect errored column
// Exceptions: ArgumentNullException - Column list contains a null value, dt is null
// InvalidOperationException - General unable to complete
public static bool DoesDataTableIncludeColumns(DataTable dt, List < string > columnNames, out string missingColumn)
{
try
{
if(dt == null)
{
throw new ArgumentNullException("dt", "Cannot DoesDataTableIncludeColumns() due to null table.");
}
foreach(string colName in columnNames)
{
if(colName == null)
{
throw new ArgumentNullException("columnNames", "Column List contains null value.");
}
if(!dt.Columns.Contains(colName))
{
missingColumn = colName;
return false;
}
}
missingColumn = string.Empty;
return true;
}
catch(ArgumentNullException)
{
throw;
}
catch(Exception)
{
throw new InvalidOperationException("Unable to check DoesDataTableIncludeColumns().");
}
Actuals Table - Complete Transform
// Purpose: Transform an actuals table in the customer format to a SensibleAI Forecast format
// Parameters: string outputTableName - Output table name for post-transformed actuals
// string actualsTable - Unmodified Actuals table as pulled from customer database
// DataTable mappingTable - Table to perform joins with to incorporate additional field(s)
// DateTime maxDate - Date to filter actuals up to (inclusive)
// SessionInfo si - Var for error logging
// Return Values: DataTable outputTable - Post-transformed actuals table
public static DataTable CreateActualsTable(string outputTableName, DataTable actualsTable, DataTable mappingTable, DateTime maxDate, SessionInfo si)
{
try
{
var validTargetDim1 = new HashSet < string >
{
"Golf Clubs", "Golf Balls", "Sporting Goods", "Clothing"
};
DateTime minDate = new DateTime(maxDate.AddYears(-8).Year, maxDate.Month, 1);
// Filter Query:
// 1) Filter for positive quantities
// 2) Filter for TargetDim1 belonging to a specified set
// 3) Create a DateTime value based on the customer's Year, Month, and Day columns
// 4) Conver the PIT Date to a WMon date
var filterQuery = from act in actualsTable.AsEnumerable()
where act.Field < decimal > ("Value") > 0
where validPartTypes.Contains(act.Field < string > ("TargetDim1"))
let date = new DateTime(act.Field < int > ("Year"), act.Field < int > ("Month"), act.Field < int > ("Day"))
where date <= maxDate
where date >= minDate
let dayOfWeek = date.DayOfWeek
let dayDiffFromMon = (7 + dayOfWeek - DayOfWeek.Monday) % 7
let mondayDate = date.AddDays(-dayDiffFromMon)
select new
{
Date = mondayDate,
TargetDim1 = act.Field < string > ("TargetDim1"),
TargetDim2 = act.Field < string > ("TargetDim2"),
TargetDim3 = act.Field < string > ("TargetDim3")
Value = act.Field < decimal > ("Value")
};
bool isResultEmpty = !filterQuery.Any();
if(isResultEmpty)
{
throw new InvalidOperationException("In CreateActualsTable(), table is empty after filtering.");
}
// Aggregation Query:
// 1) Inner Join with the Mapping table on Dim1 and Dim2 to add in the target Location
// 2) Group by Date, Dim3, and Location and sum the Value
var aggregationQuery = from row in filterQuery
join map in mappingTable.AsEnumerable()
on new
{
TargetDim1 = row.TargetDim1.ToString(), TargetDim2 = row.TargetDim2.ToString()
}
equals new
{
TargetDim1 = map.Field < string > ("TargetDim1"), TargetDim2 = map.Field < string > ("TargetDim2")
}
select new
{
Date = row.Date,
TargetDim1 = row.TargetDim1,
TargetDim2 = row.TargetDim2,
TargetDim3 = row.TargetDim3,
Location = map.Field < string > ("Location"),
Value = row.Value
}
into g
group g by new
{
g.Date, g.TargetDim3, g.Location
}
into grouped
select new
{
grouped.Key.Date,
grouped.Key.TargetDim3,
grouped.Key.Location,
ValueSum = grouped.Sum(x => x.Value)
};
isResultEmpty = !aggregationQuery.Any();
if(isResultEmpty)
{
throw new InvalidOperationException("In CreateActualsTable(), table is empty after joins and aggregations.");
}
// Write to a DataTable
DataTable outputTable = new DataTable(outputTableName);
outputTable.Columns.Add("Date", typeof(string));
outputTable.Columns.Add("TargetDim3", typeof(string));
outputTable.Columns.Add("Location", typeof(string));
outputTable.Columns.Add("Value", typeof(decimal));
foreach(var q in aggregationQuery)
{
DataRow newRow = outputTable.NewRow();
newRow["Date"] = q.Date);
newRow["TargetDim3"] = Convert.ToString(q.TargetDim3);
newRow["Location"] = Convert.ToString(q.Location);
newRow["Value"] = q.Value;
outputTable.Rows.Add(newRow);
}
if(IsDataTablePopulated(outputTable))
{
return outputTable;
}
else
{
throw new InvalidOperationException($ "In CreateActualsTable(), table is empty after writing to DataTable.");
}
}
catch(InvalidOperationException)
{
throw;
}
catch(ArgumentNullException)
{
throw;
}
catch(ArgumentException)
{
throw;
}
catch(Exception)
{
throw new InvalidOperationException("Unable to CreateActualsTable().");
}
}
Feature Table - Complete Transform
// Purpose: Transform two feature sets from the customer to a single SensibleAI Forecast table.
// Furthermore, missing values are filled with the last known value.
// Parameters: string outputTableName - Output table name for post-transformed feature
// DataTable featureTable1 - First Time-series feature set
// DataTable featureTable2 - Second Time-series feature set
// DateTime maxDate - Date to fill feature set up to
// SessionInfo si - Var for error logging
// Return Values: DataTable outputTable - Post-transformed feature table
public static CreateFeatureTable(string outputTableName, DataTable featureTable1, DataTable featureTable2, DateTime maxDate, SessionInfo si)
{
try
{
var feat1LatestDate = featureTable1.AsEnumerable().Max(row => row.Field < DateTime > ("Date"));
var feat2LatestDate = featureTable2.AsEnumerable().Max(row => row.Field < DateTime > ("Date"));
// Fill each feature table to the max date, using the last known value (assuming monthly)
var fillFeat1Query = Enumerable.Range(0, (maxDate.Year - feat1LatestDate.Year) * 12 + maxDate.Month - feat1LatestDate.Month + 1).Select(offset => maxDate.AddMonths(-offset)).Select(monthStart => new
{
FeatureName = "Feature1",
Date = monthStart,
Value = featureTable1.AsEnumerable().Where(row => row.Field < DateTime > ("Date") <= monthStart).OrderByDescending(row => row.Field < DateTime > ("Date")).Select(row => row.Field < decimal > ("Value")).FirstOrDefault()
});
bool isResultEmpty = !fillFeat1Query.Any();
if(isResultEmpty)
{
throw new InvalidOperationException("In CreateFeatureTable(), Feature1 table is empty after filling.");
}
var fillFeat2Query = Enumerable.Range(0, (maxDate.Year - feat2LatestDate.Year) * 12 + maxDate.Month - feat2LatestDate.Month + 1).Select(offset => maxDate.AddMonths(-offset)).Select(monthStart => new
{
FeatureName = "Feature2",
Date = monthStart,
Value = featureTable2.AsEnumerable().Where(row => row.Field < DateTime > ("Date") <= monthStart).OrderByDescending(row => row.Field < DateTime > ("Date")).Select(row => row.Field < decimal > ("Value")).FirstOrDefault()
});
isResultEmpty = !fillFeat2Query.Any();
if(isResultEmpty)
{
throw new InvalidOperationException("In CreateFeatureTable(), Feature2 table is empty after filling.");
}
// Concatenate the feature sets together as one table
var combinedFeatQuery = fillFeat1Query.Concat(fillFeat2Query);
isResultEmpty = !combinedFeatQuery.Any();
if(isResultEmpty)
{
throw new InvalidOperationException("In CreateFeatureTable(), Feature table is empty after concatenating.");
}
// Write to a DataTable
DataTable outputTable = new DataTable(outputTableName);
outputTable.Columns.Add("FeatureName", typeof(string));
outputTable.Columns.Add("Date", typeof(DateTime));
outputTable.Columns.Add("Value", typeof(decimal));
foreach(var q in combinedFeatQuery)
{
DataRow newRow = outputTable.NewRow();
newRow["FeatureName"] = q.FeatureName;
newRow["Date"] = q.Date;
newRow["Value"] = q.Value;
outputTable.Rows.Add(newRow);
}
if(IsDataTablePopulated(outputTable))
{
return outputTable;
}
else
{
throw new InvalidOperationException($ "In CreateFeatureTable(), table is empty after writing to DataTable.");
}
}
catch(InvalidOperationException)
{
throw;
}
catch(ArgumentNullException)
{
throw;
}
catch(ArgumentException)
{
throw;
}
catch(Exception)
{
throw new InvalidOperationException("Unable to CreateFeatureTable().");
}
}
Table Export - Write to AI Services Data Sources Database
// Writes DataTable outputTable to AI Services Data Sources Database.
// Overwrites existing table. This is best for production use cases where all
// tables in use should be current.
var xdtSolutionHelper = new DashboardExtender.AISXDT_SolutionHelper.MainClass();
xdtSolutionHelper.PersistDatabaseTable(si, "AI Services Data Sources", outputTable, BlendTableLoadTypes.DropAndRecreate, BlendTableIndexTypes.ClusteredColStore);
Exception Handling for Debugging
One should note that in all of the examples provided, everything is nestled inside a series of try-catch statements with each catch clause throwing or re-throwing exceptions with targeted error messages. In this way, if any error occurs when running the code, the error message will clearly state the function or task where it occured. This allows for targeted debugging.
Exceptions should be thrown to catch the following circumstances that commonly yield errors:
Validations:
- Check if a table is empty or null after operations
- Check if a query is empty after operations
- Table or column names are unexpected or don’t appear in the table
Operational Errors:
- Improper Type Casting when working with converting query objects to
DataTable
objects. - Wrap each function body inside a try-catch statement.
- Wrap iterations over a query to write it into a
DataTable
inside a try-catch statement to isolate the specific row the error occured on. By writing the row values in the error message, the user can identify the exact value that caused the exception. Typically, these are because of a type mismatch.
Other Considerations
The most important thing to consider is that a DataTable
has a 16,777,216 row count limitation. This will come into play when working with daily frequency data for large target counts. For example, 15k targets at 3 years of daily level data reaches this maximum limit. This doesn’t even consider cases where actuals are transactional data, and a single target may have multiple records for a single day. In this case, data must be transformed and saved in a batch approach. For example, consider loading the actuals in from the source database in segments by modifying the SQL query to operate on specific target categories or a specific date range.
Next, the biggest source of exceptions when debugging these statements are with type casting when converting from DataTable
to LINQ Query, cascding LINQ Queries, and converting back to DataTable
objects. In the provided examples, at almost every step, there is an explicit conversion statement to ensure the field is the expected data type. Being explicit and thorough in every step can reduce the chances of this type of error.
Finally, design the ingress statement to leverage user inputs. For example, the customer might have multiple projects in-production and might want to specify which actuals and feature tables get updated based on the planning period. Alternatively, if data is continuously updated in the source database, the customer might want to specify the intended forecast start date, so that entries are cut-off after this date. These inputs can be accessed using the ExtenderArgs args parameter in the Main object of the Business Rule. If the designer creates the Data Management Step with the appropriate parameters, these can be accessed in the Business Rule with:
// param_UserInput is string parameter specified in the Data Management Step
string userInput = "";
args.NameValuePairs.TryGetValue("param_UserInput", out userInput );
Again, note how the TryGetValue
method is used to validate that param_UserInput exists in the ExtenderArgs
arg object. In this way, the Ingress designer can control exception handling and build robust ways of handling unexpected data. At every step of the Ingress Statement, identify what could go wrong and build safeguards.
Lastly, a well designed Business Rule has extensive comments that explain the purposes behind code blocks. All functions should be commented with the purpose, inputs, and outputs, and the Main method should be commented to align with the flow diagram.