diff --git a/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons - Copy.py:Zone.Identifier b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons - Copy.py:Zone.Identifier new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons.py b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons.py new file mode 100644 index 0000000000000000000000000000000000000000..dd40e3b67c666e82a8888cc638a131fed49a000f --- /dev/null +++ b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_beacons.py @@ -0,0 +1,236 @@ +""" +Advanced Signal Analysis Pipeline with Custom Color Scheme +""" +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple +import numpy as np +import matplotlib.pyplot as plt +from scipy.fft import fft, fftfreq +from scipy import signal +from influxdb_client import InfluxDBClient +import math +import logging +import re + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +@dataclass +class AnalysisConfig: + influx_url: str = "http://localhost:8086" + influx_token: str = "WUxftnono0_k_t620srsO7xNG15xcej5meoShrr1ONHGvWSEqwg3gJVhthKwux7wUyw1_1hm9TAQFWKeEBHK2g==" + influx_org: str = "Student" + sampling_rate: float = 1.0 + bandpass_low_period: int = 1860 + bandpass_high_period: int = 5 + permutation_iterations: int = 100 + confidence_level: float = 0.99 + acf_top_peaks: int = 20 + frequency_tolerance: float = 0.05 + +class TemporalAnalyzer: + def __init__(self, config: AnalysisConfig): + self.config = config + self._setup_bandpass_filter() + + def _apply_filter(self, data: np.ndarray) -> np.ndarray: + """Zero-phase bandpass filtering""" + return signal.filtfilt(self.b, self.a, data) + + def _setup_bandpass_filter(self) -> None: + nyquist = 0.5 * self.config.sampling_rate + low_freq = 1 / self.config.bandpass_low_period + high_freq = 1 / self.config.bandpass_high_period + self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band') + + def fetch_temporal_data(self, bucket: str, measurement: str, + filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]: + try: + with InfluxDBClient(self.config.influx_url, self.config.influx_token, org=self.config.influx_org) as client: + query = f''' + from(bucket: "{bucket}") + |> range(start: 2023-08-01T00:00:00Z, stop: 2023-08-02T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] == "{measurement}") + |> filter(fn: (r) => r["{filter_field}"] == "{filter_value}") + |> keep(columns: ["_time"]) + ''' + result = client.query_api().query(query) + + timestamps = [record.get_time() for table in result for record in table.records] + if not timestamps: + logger.warning(f"No data for {filter_value}") + return None + + start_time = timestamps[0] + total_seconds = (timestamps[-1] - start_time).total_seconds() + time_grid = np.arange(0, total_seconds + 1, 1/self.config.sampling_rate) + values = np.zeros_like(time_grid, dtype=np.float32) + + for ts in timestamps: + idx = int((ts - start_time).total_seconds() * self.config.sampling_rate) + if idx < len(values): + values[idx] = 1.0 + + return time_grid, values + + except Exception as e: + logger.error(f"Data fetch failed: {str(e)}") + return None + + def compute_global_threshold(self, datasets: List[np.ndarray]) -> float: + max_amplitudes = [] + for data in datasets: + if data is None or len(data) < 10: + continue + for _ in range(self.config.permutation_iterations): + permuted = np.random.permutation(data) + _, fft_perm = self._compute_fft(permuted) + max_amplitudes.append(np.nanmax(fft_perm, initial=0)) + + if not max_amplitudes: + raise ValueError("Insufficient data for threshold") + + index = int(math.ceil(self.config.confidence_level * len(max_amplitudes))) - 1 + return sorted(max_amplitudes, reverse=True)[index] + + def analyze_source(self, time_series: np.ndarray) -> Tuple[np.ndarray, np.ndarray, List[float]]: + freqs, amps = self._compute_fft(time_series) + significant_mask = amps >= self.global_threshold + return freqs[significant_mask], amps[significant_mask], self._find_acf_peaks(time_series) + + def _compute_fft(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + n = len(data) + fft_vals = np.abs(fft(data)) / n + freqs = fftfreq(n, d=1/self.config.sampling_rate) + return freqs[freqs > 0], fft_vals[freqs > 0] + + def _find_acf_peaks(self, data: np.ndarray) -> List[float]: + n = len(data) + if n < 2: + return [] + + padded = np.concatenate([data, np.zeros(n)]) + fft_data = fft(padded) + acorr = np.real(np.fft.ifft(fft_data * np.conj(fft_data)))[:n] + acorr /= np.max(acorr) + + peaks, _ = signal.find_peaks(acorr, height=0.2) + valid_peaks = [p for p in peaks if p > 0] + return sorted(valid_peaks, key=lambda x: acorr[x], reverse=True)[:self.config.acf_top_peaks] + + def correlate_domains(self, fft_freqs: np.ndarray, fft_amps: np.ndarray, + acf_lags: List[int]) -> List[Tuple[float, float]]: + candidates = [] + for lag in acf_lags: + expected_freq = 1 / lag if lag > 0 else 0 + tolerance = expected_freq * self.config.frequency_tolerance + candidates.extend([(f, a) for f, a in zip(fft_freqs, fft_amps) + if abs(f - expected_freq) <= tolerance]) + + seen = set() + return [(f, a) for f, a in candidates if not (f in seen or seen.add(f))] + +class AnalysisVisualizer: + # Custom color list for 13 beacons + BEACON_COLORS = [ + '#8B0000', '#006400', '#000080', '#4B0082', '#8B008B', + '#556B2F', '#800080', '#2E8B57', '#8B4513', '#483D8B', + '#8B2500', '#2F4F4F', '#8B6969' + ] + + @staticmethod + def _get_beacon_number(label: str) -> int: + match = re.search(r'beacon(\d+)\.', label) + return int(match.group(1)) - 1 if match else 0 + + @classmethod + def plot_results(cls, results: Dict[str, List[Tuple[float, float]]]) -> None: + # Create a 5x3 grid for 13 beacons + fig, axs = plt.subplots(5, 3, figsize=(20, 25)) + fig.suptitle('Individual Beacon Temporal Patterns', fontsize=16, y=0.99) + plt.subplots_adjust(hspace=0.5, wspace=0.3) + + # Sort beacons numerically + sorted_beacons = sorted(results.items(), + key=lambda x: cls._get_beacon_number(x[0])) + + for idx, (label, points) in enumerate(sorted_beacons[:13]): + row = idx // 3 + col = idx % 3 + ax = axs[row, col] + + if not points: + ax.axis('off') + continue + + freqs, amps = zip(*sorted(points, key=lambda x: x[0])) + beacon_num = cls._get_beacon_number(label) + + # Plot individual chart + ax.plot(freqs, amps, '-', + color=cls.BEACON_COLORS[beacon_num % 13], + linewidth=2) + + # Chart customization + ax.set_title(label, fontsize=10, pad=8) + ax.set_xlabel('Frequency (Hz)', fontsize=8) + ax.set_ylabel('Normalized Amplitude', fontsize=8) + ax.grid(True, alpha=0.3) + ax.set_xlim(0, 0.16) + ax.set_ylim(0, 0.03) + + # Add amplitude scale + ax.tick_params(axis='both', which='major', labelsize=7) + + # Hide empty subplots + for idx in range(len(sorted_beacons), 15): + axs.flatten()[idx].axis('off') + + plt.tight_layout() + plt.show() + +def main(): + config = AnalysisConfig() + analyzer = TemporalAnalyzer(config) + visualizer = AnalysisVisualizer() + + sources = { + **{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(1, 14)} + } + + datasets = [] + results = {} + + for label, (bucket, measurement) in sources.items(): + if (ts_data := analyzer.fetch_temporal_data(bucket, measurement, 'url_hostname', label)): + time_grid, values = ts_data + filtered = analyzer._apply_filter(values) + datasets.append(filtered) + results[label] = (time_grid, filtered) + + try: + analyzer.global_threshold = analyzer.compute_global_threshold([d for d in datasets if d is not None]) + logger.info(f"Global threshold: {analyzer.global_threshold:.3f}") + except ValueError as e: + logger.error(str(e)) + return + + final_results = {} + for label, (time_grid, filtered_values) in results.items(): + try: + fft_freqs, fft_amps, acf_lags = analyzer.analyze_source(filtered_values) + candidates = analyzer.correlate_domains(fft_freqs, fft_amps, acf_lags) + if candidates: + final_results[label] = candidates + logger.info(f"{label}: {len(candidates)} candidates") + except Exception as e: + logger.error(f"Analysis failed for {label}: {str(e)}") + + if final_results: + visualizer.plot_results(final_results) + else: + logger.error("No results to visualize") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/Codes/FTT_autocorrelation/FFT_AutoCorrelation.py b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_candidates.py similarity index 86% rename from Codes/FTT_autocorrelation/FFT_AutoCorrelation.py rename to Codes/FTT_autocorrelation/FFT_AutoCorrelation_candidates.py index b2973927b01a5d3db58a669ce6796eda5caea916..1d45a13dfe754974afdadef21dc4059939e18021 100644 --- a/Codes/FTT_autocorrelation/FFT_AutoCorrelation.py +++ b/Codes/FTT_autocorrelation/FFT_AutoCorrelation_candidates.py @@ -30,22 +30,22 @@ class AnalysisConfig: frequency_tolerance: float = 0.05 class TemporalAnalyzer: - def __init__(self, config: AnalysisConfig): + def __init__(self, config: AnalysisConfig): # set parameters self.config = config - self._setup_bandpass_filter() + self._setup_bandpass_filter() # apply it to the data - def _apply_filter(self, data: np.ndarray) -> np.ndarray: - """bandpass filtering""" - return signal.filtfilt(self.b, self.a, data) + def _apply_filter(self, data: np.ndarray) -> np.ndarray: #the method will return an object of type it + """ bandpass filtering""" + return signal.filtfilt(self.b, self.a, data) # apply the filter to the data and return the filtered data - define frequency of array - def _setup_bandpass_filter(self) -> None: + def _setup_bandpass_filter(self) -> None: # frequency of the filter is defined nyquist = 0.5 * self.config.sampling_rate low_freq = 1 / self.config.bandpass_low_period high_freq = 1 / self.config.bandpass_high_period - self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band') + self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band') # order filter and frequency def fetch_temporal_data(self, bucket: str, measurement: str, - filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]: + filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]: # Refers to the instance of the TemporalAnalyzer class. none if it's nothing try: with InfluxDBClient(self.config.influx_url, self.config.influx_token, org=self.config.influx_org) as client: query = f''' @@ -81,11 +81,11 @@ class TemporalAnalyzer: def compute_global_threshold(self, datasets: List[np.ndarray]) -> float: max_amplitudes = [] for data in datasets: - if data is None or len(data) < 10: + if data is None or len(data) < 1: # if there is no data, continue continue for _ in range(self.config.permutation_iterations): permuted = np.random.permutation(data) - _, fft_perm = self._compute_fft(permuted) + _, fft_perm = self._compute_fft(permuted) # time domain to frequency domain max_amplitudes.append(np.nanmax(fft_perm, initial=0)) if not max_amplitudes: @@ -162,10 +162,11 @@ class AnalysisVisualizer: else: color = cls.COLOR_MAP.get(label, '#666666') - plt.plot(freqs, amps, 'o-', color=color, markersize=8, linewidth=2, alpha=0.8, label=label) + # Remove markers by using '-' instead of 'o-' + plt.plot(freqs, amps, '-', color=color, linewidth=2, alpha=0.8, label=label) - plt.xlim(left=0) - plt.ylim(bottom=0) + plt.xlim(0, 0.2) + plt.ylim(0, 0.030) plt.axhline(0, color='black', linewidth=0.8) plt.axvline(0, color='black', linewidth=0.8) plt.title('Cross-Domain Temporal Pattern Candidates', pad=20) @@ -181,9 +182,9 @@ def main(): visualizer = AnalysisVisualizer() sources = { - 'fpc.msedge.net': ('Net9', 'hostnames'), + # 'fpc.msedge.net': ('Net9', 'hostnames'), 'm4v4r4c5.stackpathcdn.com': ('Net9', 'hostnames'), - **{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(1, 14)} + **{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(7, 8)} } datasets = []