Step 4: analyze Function - Part 1 (Data Loading)¶
Implement data loading and sensor mapping in the analyze function.
1. Add Data Processing Imports¶
Add these to the top of app.py:
import pandas as pd
import numpy as np
from hhw_brick.utils import (
load_data,
map_sensors_to_columns,
extract_data_columns,
filter_time_range,
)
2. Start analyze() Function¶
def analyze(brick_model_path, timeseries_data_path, config):
"""
Execute analysis workflow
Args:
brick_model_path: Path to Brick model (.ttl)
timeseries_data_path: Path to time-series data (.csv)
config: Configuration dictionary
Returns:
dict: {'stats': {...}, 'data': DataFrame} or None if failed
"""
# Step 1: Qualify building
qualified, qualify_result = qualify(brick_model_path)
if not qualified:
return None
# Step 2: Load data
print(f"\n{'='*60}")
print("LOAD DATA")
print(f"{'='*60}\n")
g, df = load_data(brick_model_path, timeseries_data_path)
print(f"✓ Loaded {len(df)} data points")
print(f"✓ Time range: {df.index.min()} to {df.index.max()}\n")
What happens:
- Checks if building qualifies (from Step 3)
- Loads Brick model as RDF graph (g)
- Loads CSV as pandas DataFrame (df)
3. Map Sensors to Data Columns¶
# Step 3: Map sensors to columns
print(f"{'='*60}")
print("MAP SENSORS")
print(f"{'='*60}\n")
supply_uri = qualify_result["supply"]
return_uri = qualify_result["return"]
sensor_mapping = map_sensors_to_columns(g, [supply_uri, return_uri], df)
if len(sensor_mapping) != 2:
print("✗ Failed to map sensors to data columns\n")
return None
print(f"✓ Sensors mapped:")
for uri, col in sensor_mapping.items():
print(f" {uri.split('#')[-1]} → {col}")
print()
Why mapping:
- Brick models use URIs like building:sensor1
- CSV has column names like "temp_supply"
- Need to connect them
4. Extract and Rename Data¶
# Step 4: Extract data
df_extracted = extract_data_columns(
df,
sensor_mapping,
rename_map={supply_uri: "supply", return_uri: "return"}
)
print(f"✓ Extracted {len(df_extracted)} rows")
print(f"✓ Columns: {list(df_extracted.columns)}\n")
Result: DataFrame with just supply and return columns
5. Filter Time Range (Optional)¶
# Step 5: Filter time range (optional)
if config["time_range"]["start_time"] or config["time_range"]["end_time"]:
df_extracted = filter_time_range(
df_extracted,
config["time_range"]["start_time"],
config["time_range"]["end_time"]
)
print(f"✓ Filtered to {len(df_extracted)} rows\n")
# Temporary return (will add analysis in Step 5)
return {"stats": {}, "data": df_extracted}
6. Test Data Loading¶
Create test_analyze.py:
"""Test analyze data loading"""
from pathlib import Path
import sys
app_dir = Path(__file__).parent
sys.path.insert(0, str(app_dir.parent.parent.parent))
from hhw_brick.applications.my_first_app.app import analyze, load_config
fixtures = Path(__file__).parent.parent.parent.parent / "tests" / "fixtures"
model = fixtures / "Brick_Model_File" / "building_29.ttl"
data = fixtures / "TimeSeriesData" / "29hhw_system_data.csv"
if model.exists() and data.exists():
config = load_config()
results = analyze(str(model), str(data), config)
if results:
print("\n✅ Data loading successful")
print(f"Data shape: {results['data'].shape}")
print(f"Columns: {list(results['data'].columns)}")
else:
print("\n✗ Failed")
else:
print("⚠️ Test files not found")
Run:
Checkpoint¶
- Data processing imports added
-
analyze()function started - Data loading works
- Sensor mapping successful
- Data extraction returns DataFrame
- Test runs successfully
Next Step¶
👉 Step 5: analyze Function - Part 2
In this step, you'll implement the first part of the analyze() function, focusing on loading and preparing data.
Goal of This Step¶
- Load Brick model and time-series data
- Map sensors to data columns
- Extract and filter relevant data
Step 4.1: Add Required Imports¶
Add these imports to the top of your app.py (after existing imports):
import pandas as pd
import numpy as np
# Import HHW Brick utilities
from hhw_brick.utils import (
load_data,
map_sensors_to_columns,
extract_data_columns,
filter_time_range,
)
Step 4.2: Start the analyze() Function¶
Add the basic structure of the analyze() function:
def analyze(brick_model_path, timeseries_data_path, config):
"""
Execute analysis workflow
Args:
brick_model_path (str|Path): Path to Brick model file (.ttl)
timeseries_data_path (str|Path): Path to time-series data (.csv)
config (dict): Configuration dictionary from load_config()
Returns:
dict: Analysis results with 'stats' and 'data' keys, or None if analysis fails
Example:
>>> config = load_config()
>>> results = analyze("model.ttl", "data.csv", config)
>>> if results:
... print(f"Mean: {results['stats']['mean']}")
... print(f"Data shape: {results['data'].shape}")
"""
# Step 1: Qualify building
print(f"\n{'='*60}")
print(f"STEP 1: Qualification")
print(f"{'='*60}")
qualified, qualify_result = qualify(brick_model_path)
if not qualified:
print("[FAIL] Building not qualified. Analysis aborted.\n")
return None
print("[OK] Building qualified. Proceeding with analysis.\n")
# Steps 2-5 will be added below...
Step 4.3: Load Data¶
Add data loading logic:
# Step 2: Load data
print(f"{'='*60}")
print(f"STEP 2: Load Data")
print(f"{'='*60}\n")
# Load both Brick model and time-series data
g, df = load_data(brick_model_path, timeseries_data_path)
print(f"[OK] Loaded {len(df)} data points")
print(f"[OK] Time range: {df.index.min()} to {df.index.max()}")
print(f"[OK] Columns: {list(df.columns)}\n")
Understanding load_data():
- Loads Brick model as RDF graph (g)
- Loads CSV as pandas DataFrame (df)
- Automatically sets datetime index
- Returns both objects as tuple
Step 4.4: Map Sensors to Columns¶
Time-series data has column names, but Brick models have sensor URIs. We need to map them:
# Step 3: Map sensors to data columns
print(f"{'='*60}")
print(f"STEP 3: Map Sensors to Data")
print(f"{'='*60}\n")
# Get sensor URIs from qualification result
supply_uri = qualify_result["supply"]
return_uri = qualify_result["return"]
print(f"Looking for sensors:")
print(f" Supply: {supply_uri}")
print(f" Return: {return_uri}\n")
# Map sensor URIs to column names in DataFrame
sensor_mapping = map_sensors_to_columns(
g, # Brick model graph
[supply_uri, return_uri], # Sensors to find
df # DataFrame with column names
)
# Verify we found both sensors
if len(sensor_mapping) != 2:
print(f"[FAIL] Failed to map sensors to data columns")
print(f" Expected 2 sensors, found {len(sensor_mapping)}\n")
return None
print(f"[OK] Sensors mapped successfully:")
for uri, col in sensor_mapping.items():
print(f" {uri.split('#')[-1]} -> {col}")
print()
Understanding map_sensors_to_columns():
- Uses Brick model to find brick:hasLabel or brick:timeseries properties
- Matches sensor URIs to CSV column names
- Returns dictionary: {sensor_uri: column_name}
Step 4.5: Extract Relevant Data¶
Extract only the columns we need and rename them:
# Step 4: Extract and prepare data
print(f"{'='*60}")
print(f"STEP 4: Extract Data")
print(f"{'='*60}\n")
# Extract sensor data and rename columns
df_extracted = extract_data_columns(
df,
sensor_mapping,
rename_map={
supply_uri: "supply", # Rename to friendly name
return_uri: "return" # Rename to friendly name
}
)
print(f"[OK] Extracted {len(df_extracted)} rows")
print(f"[OK] Columns: {list(df_extracted.columns)}\n")
Understanding extract_data_columns(): - Extracts specific columns from DataFrame - Renames them to friendly names - Returns new DataFrame with only relevant data
Step 4.6: Filter Time Range (Optional)¶
If user specified a time range in config, filter the data:
# Step 5: Filter time range (optional)
if config["time_range"]["start_time"] or config["time_range"]["end_time"]:
print(f"{'='*60}")
print(f"STEP 5: Filter Time Range")
print(f"{'='*60}\n")
start = config["time_range"]["start_time"]
end = config["time_range"]["end_time"]
print(f"Filtering to: {start} to {end}")
df_extracted = filter_time_range(df_extracted, start, end)
print(f"[OK] Filtered to {len(df_extracted)} rows\n")
# Return prepared data (analysis logic will be added in next step)
return {
"stats": {}, # Will be filled in Step 5
"data": df_extracted
}
Step 4.7: Complete analyze() So Far¶
Your analyze() function should now look like this:
def analyze(brick_model_path, timeseries_data_path, config):
"""Execute analysis workflow"""
# Step 1: Qualify
print(f"\n{'='*60}")
print(f"STEP 1: Qualification")
print(f"{'='*60}")
qualified, qualify_result = qualify(brick_model_path)
if not qualified:
return None
print("[OK] Qualified\n")
# Step 2: Load data
print(f"{'='*60}")
print(f"STEP 2: Load Data")
print(f"{'='*60}\n")
g, df = load_data(brick_model_path, timeseries_data_path)
print(f"[OK] Loaded {len(df)} data points\n")
# Step 3: Map sensors
print(f"{'='*60}")
print(f"STEP 3: Map Sensors")
print(f"{'='*60}\n")
supply_uri = qualify_result["supply"]
return_uri = qualify_result["return"]
sensor_mapping = map_sensors_to_columns(g, [supply_uri, return_uri], df)
if len(sensor_mapping) != 2:
print("[FAIL] Sensor mapping failed\n")
return None
print("[OK] Sensors mapped\n")
# Step 4: Extract data
print(f"{'='*60}")
print(f"STEP 4: Extract Data")
print(f"{'='*60}\n")
df_extracted = extract_data_columns(
df, sensor_mapping,
rename_map={supply_uri: "supply", return_uri: "return"}
)
print(f"[OK] Data extracted\n")
# Step 5: Filter time range (optional)
if config["time_range"]["start_time"] or config["time_range"]["end_time"]:
df_extracted = filter_time_range(
df_extracted,
config["time_range"]["start_time"],
config["time_range"]["end_time"]
)
print(f"[OK] Time filtered\n")
# Placeholder return (will be completed in Step 5)
return {
"stats": {},
"data": df_extracted
}
Step 4.8: Test Data Loading¶
Create a test to verify data loading works:
Create test_analyze_part1.py:
"""
Test data loading part of analyze function
"""
from pathlib import Path
import sys
app_dir = Path(__file__).parent
sys.path.insert(0, str(app_dir.parent.parent.parent))
from hhw_brick.applications.my_first_app.app import analyze, load_config
def test_data_loading():
"""Test data loading steps"""
print("Testing data loading...\n")
# Use test fixtures
fixtures = Path(__file__).parent.parent.parent.parent / "tests" / "fixtures"
# Find a qualified building
model_file = fixtures / "Brick_Model_File" / "building_105_non-condensing_h.ttl"
data_file = fixtures / "TimeSeriesData" / "105hhw_system_data.csv"
if not model_file.exists() or not data_file.exists():
print("Test files not found. Skipping test.")
return
# Load config
config = load_config()
# Run analysis (only data loading part)
print(f"Testing with: {model_file.name}\n")
results = analyze(str(model_file), str(data_file), config)
if results:
print(f"\n{'='*60}")
print("✅ Data loading successful!")
print(f"{'='*60}")
print(f"Data shape: {results['data'].shape}")
print(f"Columns: {list(results['data'].columns)}")
print(f"First 5 rows:")
print(results['data'].head())
else:
print("\n❌ Data loading failed")
if __name__ == "__main__":
test_data_loading()
Run the test:
Checkpoint¶
Before proceeding, verify:
-
analyze()function exists - Qualification step works
- Data loading succeeds
- Sensor mapping finds sensors
- Data extraction returns DataFrame
- Optional time filtering works
- Test script runs successfully
Next Steps¶
✅ Data loading complete!
👉 Continue to Step 5: analyze Function - Part 2 (Analysis Logic)
Common Issues¶
Issue: ModuleNotFoundError: No module named 'hhw_brick.utils'
Solution: Check sys.path.insert(0, ...) is at top of file
Issue: KeyError when accessing sensor URIs
Solution: Verify qualify() returns correct keys in details dict
Issue: sensor_mapping is empty
Solution:
- Check if CSV column names match Brick model labels
- Ensure sensors have brick:hasLabel or brick:timeseries properties
Issue: df_extracted has NaN values
Solution: This is normal; we'll handle missing data in Step 5
Understanding the HHW Brick Utilities¶
load_data()¶
Returns:
- g: RDF graph (rdflib.Graph) with Brick model
- df: pandas DataFrame with time-series data (datetime index)
map_sensors_to_columns()¶
Returns: Dict mapping sensor URIs to column names
extract_data_columns()¶
Returns: DataFrame with extracted and renamed columns
filter_time_range()¶
Parameters:
- start_time: String "YYYY-MM-DD" or None
- end_time: String "YYYY-MM-DD" or None
Returns: Filtered DataFrame