Skip to content

Step 5: analyze Function - Part 2 (Analysis Logic)

Implement statistical analysis and anomaly detection.


1. Calculate Temperature Differential

Continue the analyze() function from Step 4:

    # Step 6: Analyze
    print(f"{'='*60}")
    print("ANALYZE")
    print(f"{'='*60}\n")

    # Calculate temperature differential
    df_extracted["temp_diff"] = df_extracted["supply"] - df_extracted["return"]
    df_clean = df_extracted.dropna().copy()

    if len(df_clean) == 0:
        print("✗ No valid data after removing NaN\n")
        return None

    print(f"✓ Valid data points: {len(df_clean)}")

Why dropna(): Removes rows with missing values


2. Compute Statistics

    # Get thresholds from config
    threshold_min = config["analysis"]["threshold_min_delta"]
    threshold_max = config["analysis"]["threshold_max_delta"]

    # Calculate statistics
    stats = {
        "count": len(df_clean),
        "mean_temp_diff": df_clean["temp_diff"].mean(),
        "std_temp_diff": df_clean["temp_diff"].std(),
        "min_temp_diff": df_clean["temp_diff"].min(),
        "max_temp_diff": df_clean["temp_diff"].max(),
        "median_temp_diff": df_clean["temp_diff"].median(),
        "q25_temp_diff": df_clean["temp_diff"].quantile(0.25),
        "q75_temp_diff": df_clean["temp_diff"].quantile(0.75),
        "mean_supply_temp": df_clean["supply"].mean(),
        "mean_return_temp": df_clean["return"].mean(),
    }

Statistics explained: - mean() - Average - std() - Standard deviation (spread) - quantile(0.25) - 25th percentile (Q1) - median() - 50th percentile


3. Detect Anomalies

    # Anomaly detection
    anomalies_low = df_clean[df_clean["temp_diff"] < threshold_min]
    anomalies_high = df_clean[df_clean["temp_diff"] > threshold_max]

    stats["anomalies_below_threshold"] = len(anomalies_low)
    stats["anomalies_above_threshold"] = len(anomalies_high)
    stats["anomaly_rate"] = (
        (stats["anomalies_below_threshold"] + stats["anomalies_above_threshold"])
        / stats["count"] * 100
    )

    # Print summary
    print(f"\nStatistics:")
    print(f"  Mean: {stats['mean_temp_diff']:.2f}°C")
    print(f"  Std:  {stats['std_temp_diff']:.2f}°C")
    print(f"  Range: [{stats['min_temp_diff']:.2f}, {stats['max_temp_diff']:.2f}]°C")
    print(f"  Anomalies: {stats['anomaly_rate']:.2f}%")

4. Add Time Features

    # Add time features for visualization
    df_clean.loc[:, "hour"] = df_clean.index.hour
    df_clean.loc[:, "weekday"] = df_clean.index.dayofweek
    df_clean.loc[:, "month"] = df_clean.index.month

Time features: - hour - 0-23 - weekday - 0=Monday, 6=Sunday - month - 1-12


5. Save Results

def save_results(results, config):
    """Save statistics and time-series to CSV/JSON"""
    output_dir = Path(config["output"]["output_dir"])
    output_dir.mkdir(parents=True, exist_ok=True)
    fmt = config["output"]["export_format"]

    print(f"\n{'='*60}")
    print(f"SAVE: {output_dir}")
    print(f"{'='*60}\n")

    # Save stats
    stats_file = output_dir / f"stats.{fmt}"
    stats_df = pd.DataFrame([results["stats"]])

    if fmt == "csv":
        stats_df.to_csv(stats_file, index=False)
    else:
        stats_df.to_json(stats_file, orient="records", indent=2)
    print(f"✓ {stats_file.name}")

    # Save timeseries
    ts_file = output_dir / f"timeseries.{fmt}"
    if fmt == "csv":
        results["data"].to_csv(ts_file)
    else:
        results["data"].to_json(ts_file, orient="index", date_format="iso")
    print(f"✓ {ts_file.name}")

6. Complete analyze()

    # Create results
    results = {"stats": stats, "data": df_clean}

    # Save and visualize
    if config["output"]["save_results"]:
        save_results(results, config)

    if config["output"]["generate_plots"]:
        generate_plots(results, config)  # Step 6

    if config["output"]["generate_plotly_html"]:
        generate_plotly_html(results, config)  # Step 7

    return results

7. Test Complete Analysis

Update your test:

"""Test complete analysis"""
from hhw_brick.applications.my_first_app.app import analyze, load_config

config = load_config()
config["output"]["output_dir"] = "./test_output"

results = analyze("building_29.ttl", "29hhw_system_data.csv", config)

if results:
    print("\n✅ Analysis complete!")
    print(f"Mean temp diff: {results['stats']['mean_temp_diff']:.2f}°C")
    print(f"Data points: {len(results['data'])}")

Checkpoint

  • Temperature differential calculated
  • Statistics computed
  • Anomaly detection implemented
  • Time features added
  • save_results() function created
  • Test runs successfully

Next Step

👉 Step 6: Matplotlib Visualization

In this step, you'll implement the analysis logic - calculating statistics and processing data.

Goal of This Step

  • Calculate temperature differential
  • Compute statistical metrics
  • Detect anomalies
  • Add time-based features
  • Complete the analyze() function

Step 5.1: Add Analysis Section

After Step 4 (data loading and extraction), add the analysis logic to analyze() function:

Add this code to app.py after the time filtering section:

    # Step 6: Analyze data
    print(f"{'='*60}")
    print(f"STEP 6: Analyze Data")
    print(f"{'='*60}\n")

    # Calculate temperature differential
    df_extracted["temp_diff"] = df_extracted["supply"] - df_extracted["return"]

    # Remove rows with missing data
    df_clean = df_extracted.dropna().copy()

    print(f"Valid data points: {len(df_clean)} (after removing NaN values)")

    if len(df_clean) == 0:
        print("[FAIL] No valid data after cleaning\n")
        return None

Understanding: - df["supply"] - df["return"] calculates the temperature difference - dropna() removes any rows with missing values - .copy() creates a copy to avoid pandas warnings


Step 5.2: Calculate Statistics

Add statistical calculations:

    # Get thresholds from config
    threshold_min = config["analysis"]["threshold_min_delta"]
    threshold_max = config["analysis"]["threshold_max_delta"]

    # Calculate statistics
    stats = {
        "count": len(df_clean),
        "mean_temp_diff": df_clean["temp_diff"].mean(),
        "std_temp_diff": df_clean["temp_diff"].std(),
        "min_temp_diff": df_clean["temp_diff"].min(),
        "max_temp_diff": df_clean["temp_diff"].max(),
        "median_temp_diff": df_clean["temp_diff"].median(),
        "q25_temp_diff": df_clean["temp_diff"].quantile(0.25),
        "q75_temp_diff": df_clean["temp_diff"].quantile(0.75),
        "mean_supply_temp": df_clean["supply"].mean(),
        "mean_return_temp": df_clean["return"].mean(),
    }

    print(f"\nStatistics:")
    print(f"  Mean temp diff:   {stats['mean_temp_diff']:.2f} °C")
    print(f"  Std deviation:    {stats['std_temp_diff']:.2f} °C")
    print(f"  Range:            [{stats['min_temp_diff']:.2f}, {stats['max_temp_diff']:.2f}] °C")
    print(f"  Median:           {stats['median_temp_diff']:.2f} °C")

Understanding Statistics: - mean() - Average value - std() - Standard deviation (spread) - min()/max() - Minimum and maximum values - median() - Middle value (50th percentile) - quantile(0.25) - 25th percentile (Q1) - quantile(0.75) - 75th percentile (Q3)


Step 5.3: Anomaly Detection

Detect values outside normal range:

    # Detect anomalies
    anomalies_low = df_clean[df_clean["temp_diff"] < threshold_min]
    anomalies_high = df_clean[df_clean["temp_diff"] > threshold_max]

    stats["anomalies_below_threshold"] = len(anomalies_low)
    stats["anomalies_above_threshold"] = len(anomalies_high)
    stats["anomaly_rate"] = (
        (stats["anomalies_below_threshold"] + stats["anomalies_above_threshold"])
        / stats["count"]
        * 100
    )

    print(f"  Anomalies (low):  {stats['anomalies_below_threshold']} "
          f"({stats['anomalies_below_threshold']/stats['count']*100:.1f}%)")
    print(f"  Anomalies (high): {stats['anomalies_above_threshold']} "
          f"({stats['anomalies_above_threshold']/stats['count']*100:.1f}%)")
    print(f"  Total anomalies:  {stats['anomaly_rate']:.2f}%")

Understanding Anomaly Detection: - df[df["column"] < value] - Filter rows below threshold - df[df["column"] > value] - Filter rows above threshold - Anomaly rate = (total anomalies / total points) × 100


Step 5.4: Add Time Features

Add time-based columns for analysis:

    # Add time-based features for visualization
    df_clean.loc[:, "hour"] = df_clean.index.hour
    df_clean.loc[:, "weekday"] = df_clean.index.dayofweek
    df_clean.loc[:, "month"] = df_clean.index.month

    print(f"\n[OK] Analysis complete!")

Understanding Time Features: - index.hour - Hour of day (0-23) - index.dayofweek - Day of week (0=Monday, 6=Sunday) - index.month - Month (1-12) - These are used for time-pattern analysis in visualizations


Step 5.5: Return Results

Complete the analyze() function by returning results:

    # Create results dictionary
    results = {
        "stats": stats,
        "data": df_clean
    }

    # Step 7: Output (save and visualize)
    if config["output"]["save_results"]:
        save_results(results, config)

    if config["output"]["generate_plots"]:
        generate_plots(results, config)

    if config["output"]["generate_plotly_html"]:
        generate_plotly_html(results, config)

    return results

Step 5.6: Implement save_results()

Add the function to save analysis results:

def save_results(results, config):
    """Save analysis results to CSV or JSON"""
    output_dir = Path(config["output"]["output_dir"])
    output_dir.mkdir(parents=True, exist_ok=True)
    export_format = config["output"]["export_format"]

    print(f"\n{'='*60}")
    print(f"SAVE: Saving results to {output_dir}")
    print(f"{'='*60}\n")

    # Save statistics
    stats_file = output_dir / f"stats.{export_format}"
    stats_df = pd.DataFrame([results["stats"]])

    if export_format == "csv":
        stats_df.to_csv(stats_file, index=False)
    else:  # json
        stats_df.to_json(stats_file, orient="records", indent=2)

    print(f"  [OK] {stats_file.name}")

    # Save timeseries data
    ts_file = output_dir / f"timeseries.{export_format}"

    if export_format == "csv":
        results["data"].to_csv(ts_file)
    else:  # json
        results["data"].to_json(ts_file, orient="index", date_format="iso")

    print(f"  [OK] {ts_file.name}")

Step 5.7: Complete analyze() Function

Your complete analyze() function should look like this:

def analyze(brick_model_path, timeseries_data_path, config):
    """Execute analysis workflow"""

    # Step 1: Qualify
    qualified, qualify_result = qualify(brick_model_path)
    if not qualified:
        return None

    # Step 2: Load data
    g, df = load_data(brick_model_path, timeseries_data_path)

    # Step 3: Map sensors
    supply_uri = qualify_result["supply"]
    return_uri = qualify_result["return"]
    sensor_mapping = map_sensors_to_columns(g, [supply_uri, return_uri], df)

    if len(sensor_mapping) != 2:
        print("[FAIL] Sensor mapping failed\n")
        return None

    # Step 4: Extract data
    df_extracted = extract_data_columns(
        df, sensor_mapping,
        rename_map={supply_uri: "supply", return_uri: "return"}
    )

    # Step 5: Filter time range (optional)
    if config["time_range"]["start_time"] or config["time_range"]["end_time"]:
        df_extracted = filter_time_range(
            df_extracted,
            config["time_range"]["start_time"],
            config["time_range"]["end_time"]
        )

    # Step 6: Analyze
    df_extracted["temp_diff"] = df_extracted["supply"] - df_extracted["return"]
    df_clean = df_extracted.dropna().copy()

    if len(df_clean) == 0:
        return None

    # Calculate statistics
    threshold_min = config["analysis"]["threshold_min_delta"]
    threshold_max = config["analysis"]["threshold_max_delta"]

    stats = {
        "count": len(df_clean),
        "mean_temp_diff": df_clean["temp_diff"].mean(),
        "std_temp_diff": df_clean["temp_diff"].std(),
        "min_temp_diff": df_clean["temp_diff"].min(),
        "max_temp_diff": df_clean["temp_diff"].max(),
        "median_temp_diff": df_clean["temp_diff"].median(),
        "q25_temp_diff": df_clean["temp_diff"].quantile(0.25),
        "q75_temp_diff": df_clean["temp_diff"].quantile(0.75),
        "mean_supply_temp": df_clean["supply"].mean(),
        "mean_return_temp": df_clean["return"].mean(),
        "anomalies_below_threshold": len(df_clean[df_clean["temp_diff"] < threshold_min]),
        "anomalies_above_threshold": len(df_clean[df_clean["temp_diff"] > threshold_max]),
    }
    stats["anomaly_rate"] = (
        (stats["anomalies_below_threshold"] + stats["anomalies_above_threshold"])
        / stats["count"] * 100
    )

    # Add time features
    df_clean.loc[:, "hour"] = df_clean.index.hour
    df_clean.loc[:, "weekday"] = df_clean.index.dayofweek
    df_clean.loc[:, "month"] = df_clean.index.month

    # Create results
    results = {"stats": stats, "data": df_clean}

    # Step 7: Output
    if config["output"]["save_results"]:
        save_results(results, config)

    if config["output"]["generate_plots"]:
        generate_plots(results, config)

    if config["output"]["generate_plotly_html"]:
        generate_plotly_html(results, config)

    return results

Step 5.8: Test the Analysis

Create a test script:

"""Test complete analyze function"""
from pathlib import Path
import sys

app_dir = Path(__file__).parent
sys.path.insert(0, str(app_dir.parent.parent.parent))

from hhw_brick.applications.my_first_app.app import analyze, load_config

def test_analyze():
    """Test complete analysis"""
    print("Testing complete analyze function...\n")

    fixtures = Path(__file__).parent.parent.parent.parent / "tests" / "fixtures"
    model_file = fixtures / "Brick_Model_File" / "building_29.ttl"
    data_file = fixtures / "TimeSeriesData" / "29hhw_system_data.csv"

    if not model_file.exists() or not data_file.exists():
        print("Test files not found")
        return

    config = load_config()
    config["output"]["output_dir"] = "./test_results"

    results = analyze(str(model_file), str(data_file), config)

    if results:
        print(f"\n{'='*60}")
        print("✅ Analysis complete!")
        print(f"{'='*60}")
        print(f"\nStatistics:")
        for key, value in results["stats"].items():
            if isinstance(value, float):
                print(f"  {key}: {value:.2f}")
            else:
                print(f"  {key}: {value}")

        print(f"\nData shape: {results['data'].shape}")
        print(f"Columns: {list(results['data'].columns)}")
    else:
        print("\n❌ Analysis failed")

if __name__ == "__main__":
    test_analyze()

Checkpoint

Before proceeding, verify:

  • Temperature differential calculated
  • Statistics computed (mean, std, min, max, etc.)
  • Anomaly detection implemented
  • Time features added
  • save_results() function created
  • analyze() returns results dictionary
  • Test script runs successfully

Next Steps

✅ Analysis logic complete!

👉 Continue to Step 6: Matplotlib Visualization


Common Issues

Issue: KeyError: 'threshold_min_delta'
Solution: Make sure your config.yaml has the analysis section with these parameters

Issue: All values are NaN after dropna()
Solution: Check if your sensors are correctly mapped and have valid data

Issue: Division by zero in anomaly rate
Solution: Add check: if stats['count'] > 0: before calculating rate

Issue: SettingWithCopyWarning from pandas
Solution: Use .copy() when creating df_clean and .loc[] for assignments


Summary

You've now completed the core analysis logic:

Data Processing: Calculate temperature differential
Statistics: Mean, std, min, max, quantiles
Anomaly Detection: Find values outside thresholds
Time Features: Add hour, weekday, month columns
Save Results: Export to CSV or JSON

Next step: Create visualizations!