From b19fa54e38c0b01f7b6f59ccedfce4de76a3005f Mon Sep 17 00:00:00 2001
From: Aida Nikkhah Nasab <aida.nikkhah-nasab@stud.th-deg.de>
Date: Tue, 11 Mar 2025 22:46:28 +0100
Subject: [PATCH] add FFT-based temporal analysis pipeline with custom
 visualization for beacon data

---
 ...relation_beacons - Copy.py:Zone.Identifier |   0
 .../FFT_AutoCorrelation_beacons.py            | 236 ++++++++++++++++++
 ...n.py => FFT_AutoCorrelation_candidates.py} |  31 +--
 3 files changed, 252 insertions(+), 15 deletions(-)
 create mode 100644 Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons - Copy.py:Zone.Identifier
 create mode 100644 Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons.py
 rename Codes/FTT_autocorrelation/{FFT_AutoCorrelation.py => FFT_AutoCorrelation_candidates.py} (86%)

diff --git a/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons - Copy.py:Zone.Identifier b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons - Copy.py:Zone.Identifier
new file mode 100644
index 0000000..e69de29
diff --git a/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons.py b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons.py
new file mode 100644
index 0000000..dd40e3b
--- /dev/null
+++ b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons.py
@@ -0,0 +1,236 @@
+"""
+Advanced Signal Analysis Pipeline with Custom Color Scheme
+"""
+from dataclasses import dataclass
+from typing import Dict, List, Optional, Tuple
+import numpy as np
+import matplotlib.pyplot as plt
+from scipy.fft import fft, fftfreq
+from scipy import signal
+from influxdb_client import InfluxDBClient
+import math
+import logging
+import re
+
+# Configure logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+@dataclass
+class AnalysisConfig:
+    influx_url: str = "http://localhost:8086"
+    influx_token: str = "WUxftnono0_k_t620srsO7xNG15xcej5meoShrr1ONHGvWSEqwg3gJVhthKwux7wUyw1_1hm9TAQFWKeEBHK2g=="
+    influx_org: str = "Student"
+    sampling_rate: float = 1.0
+    bandpass_low_period: int = 1860
+    bandpass_high_period: int = 5
+    permutation_iterations: int = 100
+    confidence_level: float = 0.99
+    acf_top_peaks: int = 20
+    frequency_tolerance: float = 0.05
+
+class TemporalAnalyzer:
+    def __init__(self, config: AnalysisConfig):
+        self.config = config
+        self._setup_bandpass_filter()
+        
+    def _apply_filter(self, data: np.ndarray) -> np.ndarray:
+        """Zero-phase bandpass filtering"""
+        return signal.filtfilt(self.b, self.a, data)
+        
+    def _setup_bandpass_filter(self) -> None:
+        nyquist = 0.5 * self.config.sampling_rate
+        low_freq = 1 / self.config.bandpass_low_period
+        high_freq = 1 / self.config.bandpass_high_period
+        self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band')
+
+    def fetch_temporal_data(self, bucket: str, measurement: str, 
+                           filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]:
+        try:
+            with InfluxDBClient(self.config.influx_url, self.config.influx_token, org=self.config.influx_org) as client:
+                query = f'''
+                from(bucket: "{bucket}")
+                  |> range(start: 2023-08-01T00:00:00Z, stop: 2023-08-02T00:00:00Z)
+                  |> filter(fn: (r) => r["_measurement"] == "{measurement}")
+                  |> filter(fn: (r) => r["{filter_field}"] == "{filter_value}")
+                  |> keep(columns: ["_time"])
+                '''
+                result = client.query_api().query(query)
+                
+                timestamps = [record.get_time() for table in result for record in table.records]
+                if not timestamps:
+                    logger.warning(f"No data for {filter_value}")
+                    return None
+                
+                start_time = timestamps[0]
+                total_seconds = (timestamps[-1] - start_time).total_seconds()
+                time_grid = np.arange(0, total_seconds + 1, 1/self.config.sampling_rate)
+                values = np.zeros_like(time_grid, dtype=np.float32)
+                
+                for ts in timestamps:
+                    idx = int((ts - start_time).total_seconds() * self.config.sampling_rate)
+                    if idx < len(values):
+                        values[idx] = 1.0
+                
+                return time_grid, values
+                
+        except Exception as e:
+            logger.error(f"Data fetch failed: {str(e)}")
+            return None
+
+    def compute_global_threshold(self, datasets: List[np.ndarray]) -> float:
+        max_amplitudes = []
+        for data in datasets:
+            if data is None or len(data) < 10:
+                continue
+            for _ in range(self.config.permutation_iterations):
+                permuted = np.random.permutation(data)
+                _, fft_perm = self._compute_fft(permuted)
+                max_amplitudes.append(np.nanmax(fft_perm, initial=0))
+        
+        if not max_amplitudes:
+            raise ValueError("Insufficient data for threshold")
+        
+        index = int(math.ceil(self.config.confidence_level * len(max_amplitudes))) - 1
+        return sorted(max_amplitudes, reverse=True)[index]
+
+    def analyze_source(self, time_series: np.ndarray) -> Tuple[np.ndarray, np.ndarray, List[float]]:
+        freqs, amps = self._compute_fft(time_series)
+        significant_mask = amps >= self.global_threshold
+        return freqs[significant_mask], amps[significant_mask], self._find_acf_peaks(time_series)
+
+    def _compute_fft(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
+        n = len(data)
+        fft_vals = np.abs(fft(data)) / n
+        freqs = fftfreq(n, d=1/self.config.sampling_rate)
+        return freqs[freqs > 0], fft_vals[freqs > 0]
+
+    def _find_acf_peaks(self, data: np.ndarray) -> List[float]:
+        n = len(data)
+        if n < 2:
+            return []
+            
+        padded = np.concatenate([data, np.zeros(n)])
+        fft_data = fft(padded)
+        acorr = np.real(np.fft.ifft(fft_data * np.conj(fft_data)))[:n]
+        acorr /= np.max(acorr)
+        
+        peaks, _ = signal.find_peaks(acorr, height=0.2)
+        valid_peaks = [p for p in peaks if p > 0]
+        return sorted(valid_peaks, key=lambda x: acorr[x], reverse=True)[:self.config.acf_top_peaks]
+
+    def correlate_domains(self, fft_freqs: np.ndarray, fft_amps: np.ndarray,
+                         acf_lags: List[int]) -> List[Tuple[float, float]]:
+        candidates = []
+        for lag in acf_lags:
+            expected_freq = 1 / lag if lag > 0 else 0
+            tolerance = expected_freq * self.config.frequency_tolerance
+            candidates.extend([(f, a) for f, a in zip(fft_freqs, fft_amps) 
+                             if abs(f - expected_freq) <= tolerance])
+        
+        seen = set()
+        return [(f, a) for f, a in candidates if not (f in seen or seen.add(f))]
+
+class AnalysisVisualizer:
+    # Custom color list for 13 beacons
+    BEACON_COLORS = [
+        '#8B0000', '#006400', '#000080', '#4B0082', '#8B008B',
+        '#556B2F', '#800080', '#2E8B57', '#8B4513', '#483D8B',
+        '#8B2500', '#2F4F4F', '#8B6969'
+    ]
+    
+    @staticmethod
+    def _get_beacon_number(label: str) -> int:
+        match = re.search(r'beacon(\d+)\.', label)
+        return int(match.group(1)) - 1 if match else 0
+    
+    @classmethod
+    def plot_results(cls, results: Dict[str, List[Tuple[float, float]]]) -> None:
+        # Create a 5x3 grid for 13 beacons
+        fig, axs = plt.subplots(5, 3, figsize=(20, 25))
+        fig.suptitle('Individual Beacon Temporal Patterns', fontsize=16, y=0.99)
+        plt.subplots_adjust(hspace=0.5, wspace=0.3)
+        
+        # Sort beacons numerically
+        sorted_beacons = sorted(results.items(),
+                               key=lambda x: cls._get_beacon_number(x[0]))
+        
+        for idx, (label, points) in enumerate(sorted_beacons[:13]):
+            row = idx // 3
+            col = idx % 3
+            ax = axs[row, col]
+            
+            if not points:
+                ax.axis('off')
+                continue
+                
+            freqs, amps = zip(*sorted(points, key=lambda x: x[0]))
+            beacon_num = cls._get_beacon_number(label)
+            
+            # Plot individual chart
+            ax.plot(freqs, amps, '-',
+                    color=cls.BEACON_COLORS[beacon_num % 13],
+                    linewidth=2)
+            
+            # Chart customization
+            ax.set_title(label, fontsize=10, pad=8)
+            ax.set_xlabel('Frequency (Hz)', fontsize=8)
+            ax.set_ylabel('Normalized Amplitude', fontsize=8)
+            ax.grid(True, alpha=0.3)
+            ax.set_xlim(0, 0.16)
+            ax.set_ylim(0, 0.03)
+            
+            # Add amplitude scale
+            ax.tick_params(axis='both', which='major', labelsize=7)
+        
+        # Hide empty subplots
+        for idx in range(len(sorted_beacons), 15):
+            axs.flatten()[idx].axis('off')
+        
+        plt.tight_layout()
+        plt.show()
+        
+def main():
+    config = AnalysisConfig()
+    analyzer = TemporalAnalyzer(config)
+    visualizer = AnalysisVisualizer()
+    
+    sources = {
+        **{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(1, 14)}
+    }
+    
+    datasets = []
+    results = {}
+    
+    for label, (bucket, measurement) in sources.items():
+        if (ts_data := analyzer.fetch_temporal_data(bucket, measurement, 'url_hostname', label)):
+            time_grid, values = ts_data
+            filtered = analyzer._apply_filter(values)
+            datasets.append(filtered)
+            results[label] = (time_grid, filtered)
+    
+    try:
+        analyzer.global_threshold = analyzer.compute_global_threshold([d for d in datasets if d is not None])
+        logger.info(f"Global threshold: {analyzer.global_threshold:.3f}")
+    except ValueError as e:
+        logger.error(str(e))
+        return
+    
+    final_results = {}
+    for label, (time_grid, filtered_values) in results.items():
+        try:
+            fft_freqs, fft_amps, acf_lags = analyzer.analyze_source(filtered_values)
+            candidates = analyzer.correlate_domains(fft_freqs, fft_amps, acf_lags)
+            if candidates:
+                final_results[label] = candidates
+                logger.info(f"{label}: {len(candidates)} candidates")
+        except Exception as e:
+            logger.error(f"Analysis failed for {label}: {str(e)}")
+    
+    if final_results:
+        visualizer.plot_results(final_results)
+    else:
+        logger.error("No results to visualize")
+
+if __name__ == "__main__":
+    main()
\ No newline at end of file
diff --git a/Codes/FTT_autocorrelation/FFT_AutoCorrelation.py b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_candidates.py
similarity index 86%
rename from Codes/FTT_autocorrelation/FFT_AutoCorrelation.py
rename to Codes/FTT_autocorrelation/FFT_AutoCorrelation_candidates.py
index b297392..1d45a13 100644
--- a/Codes/FTT_autocorrelation/FFT_AutoCorrelation.py
+++ b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_candidates.py
@@ -30,22 +30,22 @@ class AnalysisConfig:
     frequency_tolerance: float = 0.05
 
 class TemporalAnalyzer:
-    def __init__(self, config: AnalysisConfig):
+    def __init__(self, config: AnalysisConfig): # set parameters
         self.config = config
-        self._setup_bandpass_filter()
+        self._setup_bandpass_filter() # apply it to the data
         
-    def _apply_filter(self, data: np.ndarray) -> np.ndarray:
-        """bandpass filtering"""
-        return signal.filtfilt(self.b, self.a, data)
+    def _apply_filter(self, data: np.ndarray) -> np.ndarray: #the method will return an object of type it
+        """ bandpass filtering"""
+        return signal.filtfilt(self.b, self.a, data) # apply the filter to the data and return the filtered data - define frequency of array
         
-    def _setup_bandpass_filter(self) -> None:
+    def _setup_bandpass_filter(self) -> None: # frequency of the filter is defined
         nyquist = 0.5 * self.config.sampling_rate
         low_freq = 1 / self.config.bandpass_low_period
         high_freq = 1 / self.config.bandpass_high_period
-        self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band')
+        self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band') #  order filter and frequency
 
     def fetch_temporal_data(self, bucket: str, measurement: str, 
-                           filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]:
+                           filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]: # Refers to the instance of the TemporalAnalyzer class. none if it's nothing
         try:
             with InfluxDBClient(self.config.influx_url, self.config.influx_token, org=self.config.influx_org) as client:
                 query = f'''
@@ -81,11 +81,11 @@ class TemporalAnalyzer:
     def compute_global_threshold(self, datasets: List[np.ndarray]) -> float:
         max_amplitudes = []
         for data in datasets:
-            if data is None or len(data) < 10:
+            if data is None or len(data) < 1: # if there is no data, continue
                 continue
             for _ in range(self.config.permutation_iterations):
                 permuted = np.random.permutation(data)
-                _, fft_perm = self._compute_fft(permuted)
+                _, fft_perm = self._compute_fft(permuted) # time domain to frequency domain 
                 max_amplitudes.append(np.nanmax(fft_perm, initial=0))
         
         if not max_amplitudes:
@@ -162,10 +162,11 @@ class AnalysisVisualizer:
             else:
                 color = cls.COLOR_MAP.get(label, '#666666')
             
-            plt.plot(freqs, amps, 'o-', color=color, markersize=8, linewidth=2, alpha=0.8, label=label)
+            # Remove markers by using '-' instead of 'o-'
+            plt.plot(freqs, amps, '-', color=color, linewidth=2, alpha=0.8, label=label)
         
-        plt.xlim(left=0)
-        plt.ylim(bottom=0)
+        plt.xlim(0, 0.2)
+        plt.ylim(0, 0.030)
         plt.axhline(0, color='black', linewidth=0.8)
         plt.axvline(0, color='black', linewidth=0.8)
         plt.title('Cross-Domain Temporal Pattern Candidates', pad=20)
@@ -181,9 +182,9 @@ def main():
     visualizer = AnalysisVisualizer()
     
     sources = {
-        'fpc.msedge.net': ('Net9', 'hostnames'),
+       # 'fpc.msedge.net': ('Net9', 'hostnames'),
         'm4v4r4c5.stackpathcdn.com': ('Net9', 'hostnames'),
-        **{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(1, 14)}
+        **{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(7, 8)}
     }
     
     datasets = []
-- 
GitLab