Skip to content
Snippets Groups Projects
Commit 0ca60d39 authored by Aida Nikkhah Nasab's avatar Aida Nikkhah Nasab
Browse files

remove obsolete images and scripts; add new artificialbeacons image for updated content

parent bd7d14e0
No related branches found
Tags v1.126
No related merge requests found
Pipeline #57853 failed
Codes/FTT_autocorrelation/FFT3.png

64.8 KiB

"""
Advanced Signal Analysis Pipeline for Temporal Pattern Detection
"""
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
# (Ensure you import your InfluxDBClient if not already imported)
from scipy import signal
from influxdb_client import InfluxDBClient
import math
import logging
import re
# InfluxDB connection details
url = "http://localhost:8086"
token = "WUxftnono0_k_t620srsO7xNG15xcej5meoShrr1ONHGvWSEqwg3gJVhthKwux7wUyw1_1hm9TAQFWKeEBHK2g=="
org = "Student"
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Function to fetch and process data
def fetch_and_process_data(bucket, measurement, filter_value, filter_field="url_hostname"):
with InfluxDBClient(url=url, token=token, org=org) as client:
query_api = client.query_api()
query = f'''
from(bucket: "{bucket}")
|> range(start: 2023-08-01T00:00:00Z, stop: 2023-08-02T00:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["{filter_field}"] == "{filter_value}")
|> keep(columns: ["_time"])
'''
timestamps = [record.get_time() for table in query_api.query(query) for record in table.records]
if not timestamps:
return None, None
time_grid = np.arange(0, (timestamps[-1] - timestamps[0]).total_seconds() + 1, 1)
values = np.zeros_like(time_grid, dtype=float)
for t in timestamps:
values[int((t - timestamps[0]).total_seconds())] = 1
return time_grid, values
@dataclass
class AnalysisConfig:
influx_url: str = "http://localhost:8086"
influx_token: str = "WUxftnono0_k_t620srsO7xNG15xcej5meoShrr1ONHGvWSEqwg3gJVhthKwux7wUyw1_1hm9TAQFWKeEBHK2g=="
influx_org: str = "Student"
sampling_rate: float = 1.0
bandpass_low_period: int = 1860
bandpass_high_period: int = 5
permutation_iterations: int = 100
confidence_level: float = 0.99
acf_top_peaks: int = 20
frequency_tolerance: float = 0.05
# Fetch data for all buckets
time_grid1, values1 = fetch_and_process_data("Net", "hostnames", "saml.allianz.com")
time_grid2, values2 = fetch_and_process_data("Net8", "beacon_activity", "example1.beacon.com", filter_field="url")
time_grid3, values3 = fetch_and_process_data("Net9", "hostnames", "m4v4r4c5.stackpathcdn.com")
class TemporalAnalyzer:
def __init__(self, config: AnalysisConfig):
self.config = config
self._setup_bandpass_filter()
def _apply_filter(self, data: np.ndarray) -> np.ndarray:
"""Zero-phase bandpass filtering"""
return signal.filtfilt(self.b, self.a, data)
def _setup_bandpass_filter(self) -> None:
nyquist = 0.5 * self.config.sampling_rate
low_freq = 1 / self.config.bandpass_low_period
high_freq = 1 / self.config.bandpass_high_period
self.b, self.a = signal.butter(3, [low_freq/nyquist, high_freq/nyquist], 'band')
# Function to apply Fourier Transform with color adjustments
def apply_fourier_transform(time_grid, values, label, color, linestyle='-', linewidth=2, alpha=1.0):
freqs = fftfreq(len(values), d=1.0)
amplitudes = np.abs(fft(values)[:len(values)//2])
def fetch_temporal_data(self, bucket: str, measurement: str,
filter_field: str, filter_value: str) -> Optional[Tuple[np.ndarray, np.ndarray]]:
try:
with InfluxDBClient(self.config.influx_url, self.config.influx_token, org=self.config.influx_org) as client:
query = f'''
from(bucket: "{bucket}")
|> range(start: 2023-08-01T00:00:00Z, stop: 2023-08-02T00:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["{filter_field}"] == "{filter_value}")
|> keep(columns: ["_time"])
'''
result = client.query_api().query(query)
timestamps = [record.get_time() for table in result for record in table.records]
if not timestamps:
logger.warning(f"No data for {filter_value}")
return None
start_time = timestamps[0]
total_seconds = (timestamps[-1] - start_time).total_seconds()
time_grid = np.arange(0, total_seconds + 1, 1/self.config.sampling_rate)
values = np.zeros_like(time_grid, dtype=np.float32)
for ts in timestamps:
idx = int((ts - start_time).total_seconds() * self.config.sampling_rate)
if idx < len(values):
values[idx] = 1.0
return time_grid, values
except Exception as e:
logger.error(f"Data fetch failed: {str(e)}")
return None
# Apply amplitude limit only for saml.allianz.com
if label == "(saml.allianz.com)":
amplitudes = np.minimum(amplitudes, 50)
def compute_global_threshold(self, datasets: List[np.ndarray]) -> float:
max_amplitudes = []
for data in datasets:
if data is None or len(data) < 10:
continue
for _ in range(self.config.permutation_iterations):
permuted = np.random.permutation(data)
_, fft_perm = self._compute_fft(permuted)
max_amplitudes.append(np.nanmax(fft_perm, initial=0))
if not max_amplitudes:
raise ValueError("Insufficient data for threshold")
index = int(math.ceil(self.config.confidence_level * len(max_amplitudes))) - 1
return sorted(max_amplitudes, reverse=True)[index]
plt.plot(freqs[:len(values)//2], amplitudes, label=label, color=color, linestyle=linestyle, linewidth=linewidth, alpha=alpha)
plt.ylim(0, 600)
def analyze_source(self, time_series: np.ndarray) -> Tuple[np.ndarray, np.ndarray, List[float]]:
# Use pre-filtered data directly
freqs, amps = self._compute_fft(time_series)
significant_mask = amps >= self.global_threshold
return freqs[significant_mask], amps[significant_mask], self._find_acf_peaks(time_series)
# Plot combined FFT results
plt.figure(figsize=(10, 6))
def _compute_fft(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
n = len(data)
fft_vals = np.abs(fft(data)) / n
freqs = fftfreq(n, d=1/self.config.sampling_rate)
return freqs[freqs > 0], fft_vals[freqs > 0]
apply_fourier_transform(time_grid1, values1, "(saml.allianz.com)", "red", linewidth=3) # Red and thick
apply_fourier_transform(time_grid2, values2, "(example1.beacon.com)", "lightblue", alpha=0.5) # Light blue, dashed, semi-transparent
apply_fourier_transform(time_grid3, values3, "(m4v4r4c5.stackpathcdn.com)", "green", alpha=0.5)
def _find_acf_peaks(self, data: np.ndarray) -> List[float]:
n = len(data)
if n < 2:
return []
padded = np.concatenate([data, np.zeros(n)])
fft_data = fft(padded)
# CORRECTED AUTOCORRELATION CALCULATION
acorr = np.real(np.fft.ifft(fft_data * np.conj(fft_data)))[:n]
acorr /= np.max(acorr)
peaks, _ = signal.find_peaks(acorr, height=0.2)
valid_peaks = [p for p in peaks if p > 0]
return sorted(valid_peaks, key=lambda x: acorr[x], reverse=True)[:self.config.acf_top_peaks]
plt.title("Combined Frequency Spectrum (Fourier Transform)")
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude")
plt.legend()
plt.grid()
plt.show()
def correlate_domains(self, fft_freqs: np.ndarray, fft_amps: np.ndarray,
acf_lags: List[int]) -> List[Tuple[float, float]]:
candidates = []
for lag in acf_lags:
expected_freq = 1 / lag if lag > 0 else 0
tolerance = expected_freq * self.config.frequency_tolerance
candidates.extend([(f, a) for f, a in zip(fft_freqs, fft_amps)
if abs(f - expected_freq) <= tolerance])
seen = set()
return [(f, a) for f, a in candidates if not (f in seen or seen.add(f))]
# Function to apply Autocorrelation with adjusted visualization
def apply_autocorrelation(values, label, color, linestyle='-', linewidth=2, alpha=1.0):
autocorr = np.correlate(values, values, mode='full')[len(values) - 1:]
autocorr = autocorr / np.max(autocorr) # Normalize autocorrelation values
lags = np.arange(len(autocorr))
plt.plot(lags, autocorr, label=label, color=color, linestyle=linestyle, linewidth=linewidth, alpha=alpha)
class AnalysisVisualizer:
COLOR_MAP = {
'fpc.msedge.net': '#FF6B6B',
'm4v4r4c5.stackpathcdn.com': '#4ECDC4'
}
BEACON_CMAP = plt.cm.get_cmap('tab20', 13)
@staticmethod
def _get_beacon_number(label: str) -> int:
match = re.search(r'beacon(\d+)\.', label)
return int(match.group(1)) - 1 if match else 0
@classmethod
def plot_results(cls, results: Dict[str, List[Tuple[float, float]]]) -> None:
plt.figure(figsize=(16, 9), tight_layout=True)
plt.margins(0)
for label, points in results.items():
if not points:
continue
freqs, amps = zip(*sorted(points, key=lambda x: x[0]))
if 'beacon' in label:
beacon_num = cls._get_beacon_number(label)
color = cls.BEACON_CMAP(beacon_num / 12.0)
else:
color = cls.COLOR_MAP.get(label, '#666666')
plt.plot(freqs, amps, 'o-', color=color, markersize=8, linewidth=2, alpha=0.8, label=label)
plt.xlim(left=0)
plt.ylim(bottom=0)
plt.axhline(0, color='black', linewidth=0.8)
plt.axvline(0, color='black', linewidth=0.8)
plt.title('Cross-Domain Temporal Pattern Candidates', pad=20)
plt.xlabel('Frequency (Hz)', labelpad=15)
plt.ylabel('Normalized Amplitude', labelpad=15)
plt.grid(True, alpha=0.3)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()
# Plot combined Autocorrelation results with adjusted colors and styles
plt.figure(figsize=(10, 6))
apply_autocorrelation(values1, "(saml.allianz.com)", "red", linewidth=3) # Thicker and red to stand out
apply_autocorrelation(values2, "(example1.beacon.com)", "lightblue", alpha=0.5) # Light color, dashed, transparent
apply_autocorrelation(values3, "(m4v4r4c5.stackpathcdn.com)", "green", alpha=0.5)
plt.title("Combined Autocorrelation Function")
plt.xlabel("Sequence Offset")
plt.ylabel("Autocorrelation")
plt.legend()
plt.grid()
plt.show()
def main():
config = AnalysisConfig()
analyzer = TemporalAnalyzer(config)
visualizer = AnalysisVisualizer()
sources = {
'fpc.msedge.net': ('Net9', 'hostnames'),
'm4v4r4c5.stackpathcdn.com': ('Net9', 'hostnames'),
**{f'beacon{i}.example.com': ('ADG', 'hostnames') for i in range(1, 14)}
}
datasets = []
results = {}
for label, (bucket, measurement) in sources.items():
if (ts_data := analyzer.fetch_temporal_data(bucket, measurement, 'url_hostname', label)):
time_grid, values = ts_data
filtered = analyzer._apply_filter(values) # Filter once
datasets.append(filtered)
results[label] = (time_grid, filtered) # Store filtered data
try:
analyzer.global_threshold = analyzer.compute_global_threshold([d for d in datasets if d is not None])
logger.info(f"Global threshold: {analyzer.global_threshold:.3f}")
except ValueError as e:
logger.error(str(e))
return
final_results = {}
for label, (time_grid, filtered_values) in results.items():
try:
# Pass pre-filtered data to analysis
fft_freqs, fft_amps, acf_lags = analyzer.analyze_source(filtered_values)
candidates = analyzer.correlate_domains(fft_freqs, fft_amps, acf_lags)
if candidates:
final_results[label] = candidates
logger.info(f"{label}: {len(candidates)} candidates")
except Exception as e:
logger.error(f"Analysis failed for {label}: {str(e)}")
if final_results:
visualizer.plot_results(final_results)
else:
logger.error("No results to visualize")
if __name__ == "__main__":
main()
\ No newline at end of file
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
# (Ensure you import your InfluxDBClient if not already imported)
from influxdb_client import InfluxDBClient
# InfluxDB connection details
url = "http://localhost:8086"
token = "WUxftnono0_k_t620srsO7xNG15xcej5meoShrr1ONHGvWSEqwg3gJVhthKwux7wUyw1_1hm9TAQFWKeEBHK2g=="
org = "Student"
# Function to fetch and process data
def fetch_and_process_data(bucket, measurement, filter_value, filter_field="url_hostname"):
with InfluxDBClient(url=url, token=token, org=org) as client:
query_api = client.query_api()
query = f'''
from(bucket: "{bucket}")
|> range(start: 2023-08-01T00:00:00Z, stop: 2023-08-02T00:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["{filter_field}"] == "{filter_value}")
|> keep(columns: ["_time"])
'''
timestamps = [record.get_time() for table in query_api.query(query) for record in table.records]
if not timestamps:
return None, None
time_grid = np.arange(0, (timestamps[-1] - timestamps[0]).total_seconds() + 1, 1)
values = np.zeros_like(time_grid, dtype=float)
for t in timestamps:
values[int((t - timestamps[0]).total_seconds())] = 1
return time_grid, values
# Fetch data for all buckets
time_grid1, values1 = fetch_and_process_data("Net", "hostnames", "saml.allianz.com")
time_grid2, values2 = fetch_and_process_data("Net8", "beacon_activity", "example1.beacon.com", filter_field="url")
time_grid3, values3 = fetch_and_process_data("Net9", "hostnames", "m4v4r4c5.stackpathcdn.com")
# Function to apply Fourier Transform with color adjustments
def apply_fourier_transform(time_grid, values, label, color, linestyle='-', linewidth=2, alpha=1.0):
freqs = fftfreq(len(values), d=1.0)
amplitudes = np.abs(fft(values)[:len(values)//2])
# Apply amplitude limit only for saml.allianz.com
if label == "(saml.allianz.com)":
amplitudes = np.minimum(amplitudes, 50)
plt.plot(freqs[:len(values)//2], amplitudes, label=label, color=color, linestyle=linestyle, linewidth=linewidth, alpha=alpha)
plt.ylim(0, 600)
# Plot combined FFT results
plt.figure(figsize=(10, 6))
apply_fourier_transform(time_grid1, values1, "(saml.allianz.com)", "red", linewidth=3) # Red and thick
apply_fourier_transform(time_grid2, values2, "(example1.beacon.com)", "lightblue", alpha=0.5) # Light blue, dashed, semi-transparent
apply_fourier_transform(time_grid3, values3, "(m4v4r4c5.stackpathcdn.com)", "green", alpha=0.5)
plt.title("Combined Frequency Spectrum (Fourier Transform)")
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude")
plt.legend()
plt.grid()
plt.show()
# Function to apply Autocorrelation with adjusted visualization
def apply_autocorrelation(values, label, color, linestyle='-', linewidth=2, alpha=1.0):
autocorr = np.correlate(values, values, mode='full')[len(values) - 1:]
autocorr = autocorr / np.max(autocorr) # Normalize autocorrelation values
lags = np.arange(len(autocorr))
plt.plot(lags, autocorr, label=label, color=color, linestyle=linestyle, linewidth=linewidth, alpha=alpha)
# Plot combined Autocorrelation results with adjusted colors and styles
plt.figure(figsize=(10, 6))
apply_autocorrelation(values1, "(saml.allianz.com)", "red", linewidth=1) # Thicker and red to stand out
apply_autocorrelation(values2, "(example1.beacon.com)", "lightblue", linewidth=3) # Light color, dashed, transparent
apply_autocorrelation(values3, "(m4v4r4c5.stackpathcdn.com)", "green")
plt.title("Combined Autocorrelation Function")
plt.xlabel("Sequence Offset")
plt.ylabel("Autocorrelation")
plt.legend()
plt.grid()
plt.xlim(0, 100) # Limit the lag axis to 1000
plt.show()
Codes/FTT_autocorrelation/alltop.png

57.2 KiB

Codes/FTT_autocorrelation/artificialbeacons.png

104 KiB

Codes/FTT_autocorrelation/auto3.png

46.5 KiB

Codes/FTT_autocorrelation/auto3_100.png

73.4 KiB

Codes/FTT_autocorrelation/auto3_1000.png

92.2 KiB

0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment