import logging
import numpy as np
import threading
import time
import json
import requests
import os
from math import isclose
from queue import Queue
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Tuple
logger = logging.getLogger("SignalIntelligence")
# Custom JSON encoder for numpy types
class NumpyJSONEncoder(json.JSONEncoder):
"""JSON encoder that properly handles numpy types"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
return super().default(obj)
@dataclass
class GeoPosition:
"""Geographic position data"""
latitude: float
longitude: float
altitude: Optional[float] = None
accuracy: Optional[float] = None
timestamp: float = field(default_factory=time.time)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization"""
return {
"lat": float(self.latitude),
"lon": float(self.longitude),
"alt": float(self.altitude) if self.altitude is not None else None,
"accuracy": float(self.accuracy) if self.accuracy is not None else None,
"timestamp": float(self.timestamp)
}
@dataclass
class RFSignal:
"""RF Signal data structure"""
id: str
timestamp: float
frequency: float
bandwidth: float
power: float
iq_data: np.ndarray
source: str
classification: Optional[str] = None
confidence: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)
geo_position: Optional[GeoPosition] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization (excludes IQ data)"""
result = {
"id": self.id,
"timestamp": float(self.timestamp),
"frequency": float(self.frequency),
"frequency_mhz": float(self.frequency / 1e6),
"bandwidth": float(self.bandwidth),
"bandwidth_khz": float(self.bandwidth / 1e3),
"power": float(self.power),
"source": self.source,
"classification": self.classification,
"confidence": float(self.confidence),
"metadata": self.metadata
}
# Add geo position if available
if self.geo_position:
result["geo_position"] = self.geo_position.to_dict()
return result
class SignalIntelligenceSystem:
"""Main Signal Intelligence System class"""
def __init__(self, config, comm_network):
self.config = config
self.comm_network = comm_network
self.running = False
self.signal_queue = Queue()
self.processed_signals = []
self.noise_floor = -120 # Default noise floor in dBm
self.geo_visualization_url = None
self.geo_areas_of_operation = []
# ATL/TWPA design-aware processing
self.atl_design = None
self.recent_freqs_hz = [] # small FIFO for mixing checks
# Initialize core components
self._initialize_components()
# Setup geo visualization integration if available
self._setup_geo_integration()
# Load ATL/TWPA design configuration
self._load_atl_design()
def _initialize_components(self):
"""Initialize signal intelligence components"""
# This will be implemented in subclasses or extended
pass
def _setup_geo_integration(self):
"""Setup integration with geographic visualization"""
try:
# Check for geo visualization configuration
geo_config_path = "config/geo_visualization.json"
if os.path.exists(geo_config_path):
with open(geo_config_path, "r") as f:
geo_config = json.load(f)
# Extract server information
server_config = geo_config.get("server", {})
host = server_config.get("host", "localhost")
port = server_config.get("port", 5050)
# Set geo visualization URL
self.geo_visualization_url = f"http://{host}:{port}/api/signals"
# Load areas of operation
ao_config = geo_config.get("areas_of_operation", {})
ao_presets = ao_config.get("presets", [])
if ao_presets:
self.geo_areas_of_operation = ao_presets
logger.info(f"Loaded {len(ao_presets)} Areas of Operation for geo visualization")
logger.info(f"Geographic visualization integration configured at {self.geo_visualization_url}")
except Exception as e:
logger.warning(f"Failed to configure geo visualization integration: {e}")
def _load_atl_design(self):
"""Load optional ATL/TWPA design facts from Arxiv 2510.24753v1."""
try:
path = "config/atl_design.json"
if os.path.exists(path):
with open(path, "r") as f:
d = json.load(f)
# normalize expected keys
self.atl_design = {
"pump_hz": float(d.get("pump_hz")) if d.get("pump_hz") else None,
"rpm_notch_hz": float(d.get("rpm_notch_hz")) if d.get("rpm_notch_hz") else None,
"rpm_pole_hz": float(d.get("rpm_pole_hz")) if d.get("rpm_pole_hz") else None,
"stopbands": [
{"center_hz": float(sb["center_hz"]), "width_hz": float(sb["width_hz"])}
for sb in d.get("stopbands", [])
],
"mixing_mode": d.get("mixing_mode", "4WM") # or "3WM"
}
logger.info("ATL/TWPA design loaded from config/atl_design.json")
except Exception as e:
logger.warning(f"Failed to load ATL design: {e}")
def _label_atl_band(self, f_hz: float, tol_hz: float = 0.01e9):
"""Return band label based on design facts from ATL synthesis."""
if not self.atl_design:
return "unknown", {}
d = self.atl_design
info = {}
# near rpm notch / pole (phase-matching feature for 4WM)
if d.get("rpm_notch_hz") and abs(f_hz - d["rpm_notch_hz"]) <= tol_hz:
info["near_rpm_notch"] = True
return "near_notch", info
if d.get("rpm_pole_hz") and abs(f_hz - d["rpm_pole_hz"]) <= tol_hz:
info["near_rpm_pole"] = True
# still a passband point, but important
return "passband", info
# stopbands (e.g., wide 3fp gap)
for sb in d.get("stopbands", []):
if abs(f_hz - sb["center_hz"]) <= sb["width_hz"] / 2:
info["stopband_center_hz"] = sb["center_hz"]
return "stopband", info
return "passband", info
def _mixing_relations(self, f_hz: float, ppm: float = 150.0):
"""Compute candidate mixing partners to annotate metadata."""
if not self.atl_design or not self.atl_design.get("pump_hz"):
return {}
fp = self.atl_design["pump_hz"]
tol = fp * ppm * 1e-6 # parts-per-million window
rel = {"near_3fp": False, "idlers": []}
# third harmonic guard (esp. for KTWPA)
if abs(f_hz - 3.0 * fp) <= tol:
rel["near_3fp"] = True
# 4WM: idlers around (2fp - fs) and (2fp + fs)
if self.atl_design.get("mixing_mode", "4WM").upper() == "4WM":
for fs in self.recent_freqs_hz[-64:]:
i1 = abs((2.0 * fp) - fs) # 2fp - fs
i2 = abs((2.0 * fp) + fs) # rarely in band, still annotate
if abs(f_hz - i1) <= tol:
rel["idlers"].append({"mode": "4WM", "expr": "2fp - fs", "fs_hz": fs, "idler_hz": i1})
if abs(f_hz - i2) <= tol:
rel["idlers"].append({"mode": "4WM", "expr": "2fp + fs", "fs_hz": fs, "idler_hz": i2})
# 3WM: idlers around (fp - fs) and (fp + fs)
else:
for fs in self.recent_freqs_hz[-64:]:
i1 = abs(fp - fs) # usual forward idler
i2 = abs(fp + fs)
if abs(f_hz - i1) <= tol:
rel["idlers"].append({"mode": "3WM", "expr": "fp - fs", "fs_hz": fs, "idler_hz": i1})
if abs(f_hz - i2) <= tol:
rel["idlers"].append({"mode": "3WM", "expr": "fp + fs", "fs_hz": fs, "idler_hz": i2})
return rel
def annotate_signal_with_atl(self, signal: "RFSignal"):
"""Attach ATL/TWPA labels to signal.metadata (no-op if no design)."""
try:
# keep a small memory for mixing checks
self.recent_freqs_hz.append(float(signal.frequency))
if len(self.recent_freqs_hz) > 512:
self.recent_freqs_hz = self.recent_freqs_hz[-256:]
band_label, band_info = self._label_atl_band(signal.frequency)
mix_info = self._mixing_relations(signal.frequency)
signal.metadata.setdefault("atl", {})
signal.metadata["atl"].update({
"band_label": band_label,
**band_info,
**mix_info
})
# Log important ATL events
if band_info.get("near_rpm_notch") or mix_info.get("near_3fp") or band_label == "stopband":
logger.info(f"ATL event detected - Signal {signal.id}: {band_label}, near_3fp: {mix_info.get('near_3fp', False)}")
except Exception as e:
logger.debug(f"ATL annotate failed: {e}")
def process_atl_alerts(self, signal: "RFSignal"):
"""Process ATL-related alerts and update classifications as needed."""
if not self.atl_design or "atl" not in signal.metadata:
return
atl_data = signal.metadata["atl"]
# Check for important ATL events that warrant classification updates
alert_conditions = []
if atl_data.get("near_3fp"):
alert_conditions.append("near_3fp_harmonic")
if atl_data.get("band_label") == "stopband":
alert_conditions.append("in_designed_stopband")
if atl_data.get("near_rpm_notch"):
alert_conditions.append("near_phase_matching_notch")
if atl_data.get("idlers"):
alert_conditions.append(f"parametric_mixing_detected({len(atl_data['idlers'])})")
# Update classification if any alert conditions are met
if alert_conditions:
new_classification = f"ATL_Event: {', '.join(alert_conditions)}"
self.update_signal_classification(
signal.id,
new_classification,
0.85, # High confidence for design-based detection
update_info={"atl": atl_data, "alert_conditions": alert_conditions}
)
def send_signal_to_geo_visualization(self, signal: RFSignal) -> bool:
"""Send signal to geographic visualization system if available"""
if not self.geo_visualization_url:
return False
# Skip signals without geo position
if not signal.geo_position:
return False
try:
# Convert signal to dict
signal_dict = signal.to_dict()
# Send to geo visualization
response = requests.post(
f"{self.geo_visualization_url}/add",
json=signal_dict,
headers={"Content-Type": "application/json"},
timeout=1
)
if response.status_code == 200:
logger.debug(f"Sent signal {signal.id} to geo visualization")
return True
else:
logger.warning(f"Failed to send signal to geo visualization: {response.status_code}")
return False
except Exception as e:
logger.warning(f"Error sending signal to geo visualization: {e}")
return False
def get_signals(self, start_time=None, end_time=None, min_frequency=None, max_frequency=None, signal_id=None):
"""
Get all processed signals with optional filtering
Args:
start_time (float, optional): Filter signals after this timestamp
end_time (float, optional): Filter signals before this timestamp
min_frequency (float, optional): Filter signals above this frequency (Hz)
max_frequency (float, optional): Filter signals below this frequency (Hz)
signal_id (str, optional): Get a specific signal by ID
Returns:
list: List of signal dictionaries
"""
# Apply filters
filtered_signals = self.processed_signals.copy()
# Filter by ID if provided
if signal_id:
filtered_signals = [s for s in filtered_signals if s.id == signal_id]
# Apply time filters
if start_time is not None:
filtered_signals = [s for s in filtered_signals if s.timestamp >= start_time]
if end_time is not None:
filtered_signals = [s for s in filtered_signals if s.timestamp <= end_time]
# Apply frequency filters
if min_frequency is not None:
filtered_signals = [s for s in filtered_signals if s.frequency >= min_frequency]
if max_frequency is not None:
filtered_signals = [s for s in filtered_signals if s.frequency <= max_frequency]
# Convert RFSignal objects to dictionaries for JSON serialization
signals_list = []
for signal in filtered_signals:
signals_list.append(signal.to_dict())
return signals_list
def update_signal_classification(self, signal_id, classification, confidence, update_info=None):
"""
Update the classification of a signal
Args:
signal_id (str): ID of the signal to update
classification (str): New classification
confidence (float): New confidence value
update_info (dict, optional): Additional information about the update
Returns:
bool: True if the signal was updated, False otherwise
"""
# Find the signal
for signal in self.processed_signals:
if signal.id == signal_id:
# Store old values
old_classification = signal.classification
old_confidence = signal.confidence
# Update classification
signal.classification = classification
signal.confidence = confidence
# Add to classification history
if "classification_history" not in signal.metadata:
signal.metadata["classification_history"] = []
history_entry = {
"timestamp": time.time(),
"old_classification": old_classification,
"new_classification": classification,
"old_confidence": old_confidence,
"new_confidence": confidence
}
# Add update info if provided
if update_info:
history_entry.update(update_info)
signal.metadata["classification_history"].append(history_entry)
# Re-annotate with ATL if this is an ATL-related update
if update_info and "atl" in update_info:
self.annotate_signal_with_atl(signal)
# Log the update
logger.info(f"Updated signal {signal_id} classification: {old_classification} → {classification} (confidence: {confidence:.2f})")
# Publish update to communication network if available
if hasattr(self, "comm_network") and self.comm_network:
self.comm_network.publish(
"signal_classification_updated",
{
"signal_id": signal_id,
"old_classification": old_classification,
"new_classification": classification,
"old_confidence": old_confidence,
"new_confidence": confidence,
"update_info": update_info
},
sender="signal_intelligence"
)
return True
# Signal not found
logger.warning(f"Cannot update classification: Signal {signal_id} not found")
return False
def get_rf_environment(self):
"""
Get the current RF environment information
Returns:
dict: RF environment data
"""
# Get current time
current_time = time.time()
# Find active signals (last 60 seconds)
active_signals = [s for s in self.processed_signals if current_time - s.timestamp < 60]
# Group signals by frequency bands
frequency_bands = {}
for signal in active_signals:
# Convert to MHz for readability
freq_mhz = signal.frequency / 1_000_000
band_key = int(freq_mhz / 100) * 100 # Group by 100 MHz bands
if band_key not in frequency_bands:
frequency_bands[band_key] = {
"center_frequency_mhz": band_key + 50, # Center of the band
"bandwidth_mhz": 100,
"signals": [],
"power_values": [],
"classifications": set()
}
frequency_bands[band_key]["signals"].append(signal)
frequency_bands[band_key]["power_values"].append(signal.power)
if signal.classification and signal.classification != "Unknown":
frequency_bands[band_key]["classifications"].add(signal.classification)
# Format frequency bands for output
formatted_bands = []
for band_key, band_data in frequency_bands.items():
band_dict = {
"center_frequency_mhz": band_data["center_frequency_mhz"],
"bandwidth_mhz": band_data["bandwidth_mhz"],
"power_dbm": np.mean(band_data["power_values"]) if band_data["power_values"] else -120,
"signal_count": len(band_data["signals"]),
"protocols_detected": list(band_data["classifications"])
}
# Add ATL band labeling if design is present
if self.atl_design:
center = band_data["center_frequency_mhz"] * 1e6
label, _ = self._label_atl_band(center, tol_hz=0.02e9)
band_dict["atl_band_label"] = label
formatted_bands.append(band_dict)
# Calculate spectrum statistics
spectrum_data = {
"timestamp": current_time,
"min_frequency_mhz": min([b["center_frequency_mhz"] - b["bandwidth_mhz"]/2 for b in formatted_bands]) if formatted_bands else 0,
"max_frequency_mhz": max([b["center_frequency_mhz"] + b["bandwidth_mhz"]/2 for b in formatted_bands]) if formatted_bands else 0,
"avg_noise_floor_dbm": self.noise_floor,
"frequency_bands": formatted_bands
}
return spectrum_data
def start(self):
"""Start Signal Intelligence System"""
logger.info("Starting Signal Intelligence System")
self.running = True
# Start signal processing thread
processing_thread = threading.Thread(target=self._signal_processing_loop)
processing_thread.daemon = True
processing_thread.start()
def _signal_processing_loop(self):
"""Main signal processing loop"""
while self.running:
try:
# Process signals from queue
if not self.signal_queue.empty():
signal_data = self.signal_queue.get(timeout=1)
self.process_signal(signal_data)
self.signal_queue.task_done()
else:
time.sleep(0.1)
except Exception as e:
logger.error(f"Error in signal processing: {e}")
time.sleep(1)
def process_signal(self, signal_data):
"""Process incoming signal data - to be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement process_signal method")
def shutdown(self):
"""Shutdown the system"""
logger.info("Shutting down Signal Intelligence System")
self.running = False