Skip to content
Snippets Groups Projects
Commit b19fa54e authored by Aida Nikkhah Nasab's avatar Aida Nikkhah Nasab
Browse files

add FFT-based temporal analysis pipeline with custom visualization for beacon data

parent 93d542e0
No related branches found
No related tags found
No related merge requests found
Pipeline #58030 failed
"""
Advanced Signal Analysis Pipeline with Custom Color Scheme
"""
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
from scipy import signal
from influxdb_client import InfluxDBClient
import math
import logging
import re
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class AnalysisConfig:
influx_url: str = "http://localhost:8086"
influx_token: str = "WUxftnono0_k_t620srsO7xNG15xcej5meoShrr1ONHGvWSEqwg3gJVhthKwux7wUyw1_1hm9TAQFWKeEBHK2g=="
influx_org: str = "Student"
sampling_rate: float = 1.0
bandpass_low_period: int = 1860
bandpass_high_period: int = 5
permutation_iterations: int = 100
confidence_level: float = 0.99
acf_top_peaks: int = 20
frequency_tolerance: float = 0.05
class TemporalAnalyzer:
def __init__(self, config: AnalysisConfig):
self.config = config
self._setup_bandpass_filter()
def _apply_filter(self, data: np.ndarray) -> np.ndarray:
"""Zero-phase bandpass filtering"""
return signal.filtfilt(self.b, self.a, data)
def _setup_bandpass_filter(self) -> None:
nyquist = 0.5 * self.config.sampling_rate
low_freq = 1 / self.config.bandpass_low_period
high_freq = 1 / self.config.bandpass_high_period
self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band')
def fetch_temporal_data(self, bucket: str, measurement: str,
filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]:
try:
with InfluxDBClient(self.config.influx_url, self.config.influx_token, org=self.config.influx_org) as client:
query = f'''
from(bucket: "{bucket}")
|> range(start: 2023-08-01T00:00:00Z, stop: 2023-08-02T00:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["{filter_field}"] == "{filter_value}")
|> keep(columns: ["_time"])
'''
result = client.query_api().query(query)
timestamps = [record.get_time() for table in result for record in table.records]
if not timestamps:
logger.warning(f"No data for {filter_value}")
return None
start_time = timestamps[0]
total_seconds = (timestamps[-1] - start_time).total_seconds()
time_grid = np.arange(0, total_seconds + 1, 1/self.config.sampling_rate)
values = np.zeros_like(time_grid, dtype=np.float32)
for ts in timestamps:
idx = int((ts - start_time).total_seconds() * self.config.sampling_rate)
if idx < len(values):
values[idx] = 1.0
return time_grid, values
except Exception as e:
logger.error(f"Data fetch failed: {str(e)}")
return None
def compute_global_threshold(self, datasets: List[np.ndarray]) -> float:
max_amplitudes = []
for data in datasets:
if data is None or len(data) < 10:
continue
for _ in range(self.config.permutation_iterations):
permuted = np.random.permutation(data)
_, fft_perm = self._compute_fft(permuted)
max_amplitudes.append(np.nanmax(fft_perm, initial=0))
if not max_amplitudes:
raise ValueError("Insufficient data for threshold")
index = int(math.ceil(self.config.confidence_level * len(max_amplitudes))) - 1
return sorted(max_amplitudes, reverse=True)[index]
def analyze_source(self, time_series: np.ndarray) -> Tuple[np.ndarray, np.ndarray, List[float]]:
freqs, amps = self._compute_fft(time_series)
significant_mask = amps >= self.global_threshold
return freqs[significant_mask], amps[significant_mask], self._find_acf_peaks(time_series)
def _compute_fft(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
n = len(data)
fft_vals = np.abs(fft(data)) / n
freqs = fftfreq(n, d=1/self.config.sampling_rate)
return freqs[freqs > 0], fft_vals[freqs > 0]
def _find_acf_peaks(self, data: np.ndarray) -> List[float]:
n = len(data)
if n < 2:
return []
padded = np.concatenate([data, np.zeros(n)])
fft_data = fft(padded)
acorr = np.real(np.fft.ifft(fft_data * np.conj(fft_data)))[:n]
acorr /= np.max(acorr)
peaks, _ = signal.find_peaks(acorr, height=0.2)
valid_peaks = [p for p in peaks if p > 0]
return sorted(valid_peaks, key=lambda x: acorr[x], reverse=True)[:self.config.acf_top_peaks]
def correlate_domains(self, fft_freqs: np.ndarray, fft_amps: np.ndarray,
acf_lags: List[int]) -> List[Tuple[float, float]]:
candidates = []
for lag in acf_lags:
expected_freq = 1 / lag if lag > 0 else 0
tolerance = expected_freq * self.config.frequency_tolerance
candidates.extend([(f, a) for f, a in zip(fft_freqs, fft_amps)
if abs(f - expected_freq) <= tolerance])
seen = set()
return [(f, a) for f, a in candidates if not (f in seen or seen.add(f))]
class AnalysisVisualizer:
# Custom color list for 13 beacons
BEACON_COLORS = [
'#8B0000', '#006400', '#000080', '#4B0082', '#8B008B',
'#556B2F', '#800080', '#2E8B57', '#8B4513', '#483D8B',
'#8B2500', '#2F4F4F', '#8B6969'
]
@staticmethod
def _get_beacon_number(label: str) -> int:
match = re.search(r'beacon(\d+)\.', label)
return int(match.group(1)) - 1 if match else 0
@classmethod
def plot_results(cls, results: Dict[str, List[Tuple[float, float]]]) -> None:
# Create a 5x3 grid for 13 beacons
fig, axs = plt.subplots(5, 3, figsize=(20, 25))
fig.suptitle('Individual Beacon Temporal Patterns', fontsize=16, y=0.99)
plt.subplots_adjust(hspace=0.5, wspace=0.3)
# Sort beacons numerically
sorted_beacons = sorted(results.items(),
key=lambda x: cls._get_beacon_number(x[0]))
for idx, (label, points) in enumerate(sorted_beacons[:13]):
row = idx // 3
col = idx % 3
ax = axs[row, col]
if not points:
ax.axis('off')
continue
freqs, amps = zip(*sorted(points, key=lambda x: x[0]))
beacon_num = cls._get_beacon_number(label)
# Plot individual chart
ax.plot(freqs, amps, '-',
color=cls.BEACON_COLORS[beacon_num % 13],
linewidth=2)
# Chart customization
ax.set_title(label, fontsize=10, pad=8)
ax.set_xlabel('Frequency (Hz)', fontsize=8)
ax.set_ylabel('Normalized Amplitude', fontsize=8)
ax.grid(True, alpha=0.3)
ax.set_xlim(0, 0.16)
ax.set_ylim(0, 0.03)
# Add amplitude scale
ax.tick_params(axis='both', which='major', labelsize=7)
# Hide empty subplots
for idx in range(len(sorted_beacons), 15):
axs.flatten()[idx].axis('off')
plt.tight_layout()
plt.show()
def main():
config = AnalysisConfig()
analyzer = TemporalAnalyzer(config)
visualizer = AnalysisVisualizer()
sources = {
**{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(1, 14)}
}
datasets = []
results = {}
for label, (bucket, measurement) in sources.items():
if (ts_data := analyzer.fetch_temporal_data(bucket, measurement, 'url_hostname', label)):
time_grid, values = ts_data
filtered = analyzer._apply_filter(values)
datasets.append(filtered)
results[label] = (time_grid, filtered)
try:
analyzer.global_threshold = analyzer.compute_global_threshold([d for d in datasets if d is not None])
logger.info(f"Global threshold: {analyzer.global_threshold:.3f}")
except ValueError as e:
logger.error(str(e))
return
final_results = {}
for label, (time_grid, filtered_values) in results.items():
try:
fft_freqs, fft_amps, acf_lags = analyzer.analyze_source(filtered_values)
candidates = analyzer.correlate_domains(fft_freqs, fft_amps, acf_lags)
if candidates:
final_results[label] = candidates
logger.info(f"{label}: {len(candidates)} candidates")
except Exception as e:
logger.error(f"Analysis failed for {label}: {str(e)}")
if final_results:
visualizer.plot_results(final_results)
else:
logger.error("No results to visualize")
if __name__ == "__main__":
main()
\ No newline at end of file
......@@ -30,22 +30,22 @@ class AnalysisConfig:
frequency_tolerance: float = 0.05
class TemporalAnalyzer:
def __init__(self, config: AnalysisConfig):
def __init__(self, config: AnalysisConfig): # set parameters
self.config = config
self._setup_bandpass_filter()
self._setup_bandpass_filter() # apply it to the data
def _apply_filter(self, data: np.ndarray) -> np.ndarray:
"""bandpass filtering"""
return signal.filtfilt(self.b, self.a, data)
def _apply_filter(self, data: np.ndarray) -> np.ndarray: #the method will return an object of type it
""" bandpass filtering"""
return signal.filtfilt(self.b, self.a, data) # apply the filter to the data and return the filtered data - define frequency of array
def _setup_bandpass_filter(self) -> None:
def _setup_bandpass_filter(self) -> None: # frequency of the filter is defined
nyquist = 0.5 * self.config.sampling_rate
low_freq = 1 / self.config.bandpass_low_period
high_freq = 1 / self.config.bandpass_high_period
self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band')
self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band') # order filter and frequency
def fetch_temporal_data(self, bucket: str, measurement: str,
filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]:
filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]: # Refers to the instance of the TemporalAnalyzer class. none if it's nothing
try:
with InfluxDBClient(self.config.influx_url, self.config.influx_token, org=self.config.influx_org) as client:
query = f'''
......@@ -81,11 +81,11 @@ class TemporalAnalyzer:
def compute_global_threshold(self, datasets: List[np.ndarray]) -> float:
max_amplitudes = []
for data in datasets:
if data is None or len(data) < 10:
if data is None or len(data) < 1: # if there is no data, continue
continue
for _ in range(self.config.permutation_iterations):
permuted = np.random.permutation(data)
_, fft_perm = self._compute_fft(permuted)
_, fft_perm = self._compute_fft(permuted) # time domain to frequency domain
max_amplitudes.append(np.nanmax(fft_perm, initial=0))
if not max_amplitudes:
......@@ -162,10 +162,11 @@ class AnalysisVisualizer:
else:
color = cls.COLOR_MAP.get(label, '#666666')
plt.plot(freqs, amps, 'o-', color=color, markersize=8, linewidth=2, alpha=0.8, label=label)
# Remove markers by using '-' instead of 'o-'
plt.plot(freqs, amps, '-', color=color, linewidth=2, alpha=0.8, label=label)
plt.xlim(left=0)
plt.ylim(bottom=0)
plt.xlim(0, 0.2)
plt.ylim(0, 0.030)
plt.axhline(0, color='black', linewidth=0.8)
plt.axvline(0, color='black', linewidth=0.8)
plt.title('Cross-Domain Temporal Pattern Candidates', pad=20)
......@@ -181,9 +182,9 @@ def main():
visualizer = AnalysisVisualizer()
sources = {
'fpc.msedge.net': ('Net9', 'hostnames'),
# 'fpc.msedge.net': ('Net9', 'hostnames'),
'm4v4r4c5.stackpathcdn.com': ('Net9', 'hostnames'),
**{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(1, 14)}
**{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(7, 8)}
}
datasets = []
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment