# Interactive DOA Explorer

Interactive playback and visualization of MUSIC spectrum data.

**Run modes:**
- In Jupyter: Just run the cells
- Standalone app: `panel serve 19_interactive_doa_explorer.ipynb`

In [25]:
import numpy as np
import panel as pn
import plotly.graph_objects as go
import param
from scipy.fft import fft
import os

# Initialize Panel with Plotly support
pn.extension('plotly')

# Import constants
import importlib
import constants
importlib.reload(constants)
from constants import create_rotated_uca, CHANNEL_ORDER, RADIUS, C
from doa_py.algorithm.music_based import music

print("Extensions loaded!")

Extensions loaded!


In [26]:
# Data loading functions
def load_cs16(filepath):
    raw = np.fromfile(filepath, dtype=np.int16)
    return raw[0::2] + 1j * raw[1::2]

def load_measurement(base_dir, angle):
    data_dir = os.path.join(base_dir, f"{angle}deg")
    channels = []
    for i in range(4):
        filepath = os.path.join(data_dir, f"channel-{i}.cs16")
        channels.append(load_cs16(filepath))
    raw_data = np.vstack(channels)
    return raw_data[CHANNEL_ORDER]

def extract_tone_snapshots(X, n_snapshots=2048, fft_size=1024):
    n_samples = X.shape[1]
    fft_full = fft(X[0])
    peak_idx = np.argmax(np.abs(fft_full[1:n_samples//2])) + 1
    norm_freq = peak_idx / n_samples
    tone_bin = int(norm_freq * fft_size)
    
    snapshots = []
    hop = fft_size // 2
    for start in range(0, n_samples - fft_size, hop):
        if len(snapshots) >= n_snapshots:
            break
        segment = X[:, start:start + fft_size]
        fft_seg = fft(segment, axis=1)
        snapshots.append(fft_seg[:, tone_bin])
    return np.array(snapshots).T

def circular_error(est, true):
    err = est - true
    while err > 180: err -= 360
    while err < -180: err += 360
    return err

print("Functions defined!")

Functions defined!


In [27]:
# Dataset configurations
DATASETS = {
    '1200MHz_10deg': {'freq_hz': 1.2e9, 'data_dir': '../data/1200MHz, 0dB, 10deg increments, outside', 'step': 10},
    '1200MHz_1deg':  {'freq_hz': 1.2e9, 'data_dir': '../data/1200MHz, 0dB, 1deg increments, outside', 'step': 1},
    '1500MHz_10deg': {'freq_hz': 1.5e9, 'data_dir': '../data/1500MHz, 0dB, 10deg increments, outside', 'step': 10},
    '2400MHz_10deg': {'freq_hz': 2.4e9, 'data_dir': '../data/2400MHz, 0dB, 10deg increments, outside', 'step': 10},
    '5700MHz_10deg': {'freq_hz': 5.7e9, 'data_dir': '../data/5700MHz, 0dB, 10deg increments, outside', 'step': 10},
    '5700MHz_1deg':  {'freq_hz': 5.7e9, 'data_dir': '../data/5700MHz, 0dB, 1deg increments, outside', 'step': 1},
}

# Check which datasets exist
available_datasets = {}
for name, config in DATASETS.items():
    if os.path.exists(config['data_dir']):
        available_datasets[name] = config
        print(f"  {name}: Available")
    else:
        print(f"  {name}: Not found")

print(f"\n{len(available_datasets)} datasets available")

  1200MHz_10deg: Available
  1200MHz_1deg: Available
  1500MHz_10deg: Available
  2400MHz_10deg: Available
  5700MHz_10deg: Available
  5700MHz_1deg: Available

6 datasets available


In [28]:
class DOAExplorer(param.Parameterized):
    """Interactive DOA spectrum explorer."""
    
    dataset = param.Selector(default=list(available_datasets.keys())[0], 
                             objects=list(available_datasets.keys()),
                             doc="Select dataset")
    
    angle_idx = param.Integer(default=0, bounds=(0, 35), doc="Angle index")
    
    playing = param.Boolean(default=False, doc="Playback active")
    
    playback_speed = param.Integer(default=500, bounds=(100, 2000), step=100,
                                   doc="Playback interval (ms)")
    
    def __init__(self, **params):
        super().__init__(**params)
        self.uca = create_rotated_uca()
        self.angle_grids = np.arange(-180, 180, 1)
        self.cache = {}
        self._load_dataset()
        self._periodic_callback = None
    
    @param.depends('dataset', watch=True)
    def _load_dataset(self):
        """Load dataset and precompute spectra."""
        config = available_datasets[self.dataset]
        self.freq_hz = config['freq_hz']
        self.data_dir = config['data_dir']
        self.angle_step = config['step']
        self.wavelength = C / self.freq_hz
        
        self.angles = sorted([int(d.replace('deg', '')) 
                             for d in os.listdir(self.data_dir) 
                             if d.endswith('deg')])
        
        self.param.angle_idx.bounds = (0, len(self.angles) - 1)
        if self.angle_idx >= len(self.angles):
            self.angle_idx = 0
        
        self.cache = {}
        print(f"Loaded {self.dataset}: {len(self.angles)} angles")
    
    def _compute_spectrum(self, angle):
        """Compute MUSIC spectrum for an angle (with caching)."""
        cache_key = (self.dataset, angle)
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        try:
            X = load_measurement(self.data_dir, angle)
            snapshots = extract_tone_snapshots(X, n_snapshots=2048)
            
            spectrum = music(
                received_data=snapshots,
                num_signal=1,
                array=self.uca,
                signal_fre=self.freq_hz,
                angle_grids=self.angle_grids,
                unit="deg"
            )
            
            est = self.angle_grids[np.argmax(spectrum)]
            true_adj = angle if angle <= 180 else angle - 360
            err = circular_error(est, true_adj)
            
            result = {
                'spectrum': spectrum,
                'estimate': est,
                'true_angle': angle,
                'error': err
            }
            self.cache[cache_key] = result
            return result
        except Exception as e:
            print(f"Error at {angle}: {e}")
            return None
    
    @param.depends('angle_idx', 'dataset')
    def polar_plot(self):
        """Generate polar plot of MUSIC spectrum using Plotly."""
        angle = self.angles[self.angle_idx]
        result = self._compute_spectrum(angle)
        
        if result is None:
            fig = go.Figure()
            fig.add_annotation(text="Error loading data", x=0.5, y=0.5)
            return fig
        
        spectrum = result['spectrum']
        est = result['estimate']
        err = result['error']
        true_adj = angle if angle <= 180 else angle - 360
        
        # Convert to dB and normalize for better lobe visualization
        spectrum_db = 10 * np.log10(spectrum / np.max(spectrum) + 1e-10)
        # Shift to 0-1 range: -40dB -> 0, 0dB -> 1
        spectrum_norm = np.clip((spectrum_db + 40) / 40, 0, 1)
        
        # Convert angles to 0-360 range for Plotly
        theta_plot = np.where(self.angle_grids < 0, self.angle_grids + 360, self.angle_grids)
        
        # Sort by theta for proper rendering
        sort_idx = np.argsort(theta_plot)
        theta_sorted = theta_plot[sort_idx]
        spectrum_sorted = spectrum_norm[sort_idx]
        
        # Close the loop for proper fill
        theta_closed = np.append(theta_sorted, theta_sorted[0])
        spectrum_closed = np.append(spectrum_sorted, spectrum_sorted[0])
        
        # Create polar figure
        fig = go.Figure()
        
        # MUSIC spectrum (blue filled area)
        fig.add_trace(go.Scatterpolar(
            r=spectrum_closed,
            theta=theta_closed,
            mode='lines',
            fill='toself',
            fillcolor='rgba(0, 100, 255, 0.3)',
            line=dict(color='blue', width=2),
            name='MUSIC Spectrum'
        ))
        
        # Convert marker angles to 0-360
        true_theta = true_adj if true_adj >= 0 else true_adj + 360
        est_theta = est if est >= 0 else est + 360
        
        # True angle (green line)
        fig.add_trace(go.Scatterpolar(
            r=[0, 1.05],
            theta=[true_theta, true_theta],
            mode='lines+markers',
            line=dict(color='green', width=4),
            marker=dict(size=[0, 12], symbol='triangle-up', color='green'),
            name=f'True: {angle}°'
        ))
        
        # Estimated angle (red dashed line)
        fig.add_trace(go.Scatterpolar(
            r=[0, 1.05],
            theta=[est_theta, est_theta],
            mode='lines+markers',
            line=dict(color='red', width=3, dash='dash'),
            marker=dict(size=[0, 10], symbol='x', color='red'),
            name=f'Est: {est}°'
        ))
        
        # Error color
        if abs(err) < 20:
            title_color = 'green'
        elif abs(err) < 45:
            title_color = 'orange'
        else:
            title_color = 'red'
        
        fig.update_layout(
            polar=dict(
                radialaxis=dict(
                    visible=True, 
                    range=[0, 1.1],
                    tickvals=[0.25, 0.5, 0.75, 1.0],
                    ticktext=['-30dB', '-20dB', '-10dB', '0dB']
                ),
                angularaxis=dict(
                    direction="clockwise",
                    rotation=90,  # 0° at top
                    dtick=30
                )
            ),
            showlegend=True,
            legend=dict(x=1.05, y=1, font=dict(size=10)),
            title=dict(
                text=f'True: {angle}° | Est: {est}° | Err: <span style="color:{title_color}">{err:+.0f}°</span>',
                x=0.5,
                font=dict(size=14)
            ),
            width=580,
            height=520,
            margin=dict(l=60, r=160, t=80, b=60)
        )
        
        return fig
    
    @param.depends('angle_idx', 'dataset')
    def spectrum_plot(self):
        """Generate linear spectrum plot using Plotly."""
        angle = self.angles[self.angle_idx]
        result = self._compute_spectrum(angle)
        
        if result is None:
            fig = go.Figure()
            fig.add_annotation(text="Error loading data", x=0.5, y=0.5)
            return fig
        
        spectrum = result['spectrum']
        est = result['estimate']
        true_adj = angle if angle <= 180 else angle - 360
        
        # Convert to dB
        spectrum_db = 10 * np.log10(spectrum / np.max(spectrum) + 1e-10)
        
        fig = go.Figure()
        
        # Spectrum curve
        fig.add_trace(go.Scatter(
            x=self.angle_grids,
            y=spectrum_db,
            mode='lines',
            line=dict(color='blue', width=1.5),
            name='MUSIC Spectrum'
        ))
        
        # True angle line
        fig.add_vline(x=true_adj, line=dict(color='green', width=3), 
                      annotation_text=f"True: {angle}°")
        
        # Estimated angle line
        fig.add_vline(x=est, line=dict(color='red', width=2, dash='dash'),
                      annotation_text=f"Est: {est}°")
        
        fig.update_layout(
            xaxis_title="Angle (degrees)",
            yaxis_title="Power (dB)",
            xaxis=dict(range=[-180, 180], dtick=30),
            yaxis=dict(range=[-40, 5]),
            title="MUSIC Spectrum",
            width=600,
            height=300,
            margin=dict(l=60, r=40, t=50, b=50),
            showlegend=False
        )
        
        return fig
    
    @param.depends('angle_idx', 'dataset')
    def info_panel(self):
        """Display current info."""
        angle = self.angles[self.angle_idx]
        result = self._compute_spectrum(angle)
        
        if result is None:
            return pn.pane.Markdown("**Error loading data**")
        
        err = result['error']
        if abs(err) < 20:
            color, status = 'green', 'Good'
        elif abs(err) < 45:
            color, status = 'orange', 'Moderate'
        else:
            color, status = 'red', 'Poor'
        
        info = f"""
### Current Measurement
- **Dataset**: {self.dataset}
- **Frequency**: {self.freq_hz/1e9:.1f} GHz
- **True Angle**: {angle}°
- **Estimate**: {result['estimate']}°
- **Error**: <span style="color:{color}; font-weight:bold">{err:+.0f}° ({status})</span>
- **Angle {self.angle_idx + 1} of {len(self.angles)}**
"""
        return pn.pane.Markdown(info)
    
    def _advance(self):
        """Advance to next angle during playback."""
        if self.playing:
            self.angle_idx = (self.angle_idx + 1) % len(self.angles)
    
    @param.depends('playing', watch=True)
    def _toggle_playback(self):
        """Handle play/pause toggle."""
        if self.playing:
            if self._periodic_callback is None:
                self._periodic_callback = pn.state.add_periodic_callback(
                    self._advance, period=self.playback_speed
                )
        else:
            if self._periodic_callback is not None:
                self._periodic_callback.stop()
                self._periodic_callback = None
    
    @param.depends('playback_speed', watch=True)
    def _update_speed(self):
        """Update playback speed."""
        if self._periodic_callback is not None:
            self._periodic_callback.period = self.playback_speed

print("DOAExplorer class defined!")

DOAExplorer class defined!


In [29]:
# Create the explorer instance
explorer = DOAExplorer()

# Build the dashboard layout
controls = pn.Column(
    pn.pane.Markdown("## Controls"),
    pn.widgets.Select.from_param(explorer.param.dataset, name='Dataset'),
    pn.widgets.IntSlider.from_param(explorer.param.angle_idx, name='Angle Index'),
    pn.Row(
        pn.widgets.Toggle.from_param(explorer.param.playing, name='Play'),
        pn.widgets.IntSlider.from_param(explorer.param.playback_speed, name='Speed (ms)')
    ),
    explorer.info_panel,
    width=320
)

# Use pn.bind with proper argument handling
def get_polar_plot(angle_idx, dataset):
    return pn.pane.Plotly(explorer.polar_plot(), config={'displayModeBar': False})

def get_spectrum_plot(angle_idx, dataset):
    return pn.pane.Plotly(explorer.spectrum_plot(), config={'displayModeBar': False})

polar_pane = pn.bind(get_polar_plot, explorer.param.angle_idx, explorer.param.dataset)
spectrum_pane = pn.bind(get_spectrum_plot, explorer.param.angle_idx, explorer.param.dataset)

plots = pn.Column(
    polar_pane,
    spectrum_pane
)

# Main layout
app = pn.Row(
    controls,
    plots,
    sizing_mode='stretch_width'
)

print("App created! Run the next cell to display.")

Loaded 1200MHz_10deg: 36 angles
App created! Run the next cell to display.


In [30]:
# Display the app
# In Jupyter: displays inline
# For standalone: run `panel serve 19_interactive_doa_explorer.ipynb`
app.servable(title="DOA Explorer")

---

## Legend

**Polar Plot:**
- Blue area: MUSIC pseudo-spectrum (normalized)
- Green line: True angle (ground truth)
- Red dashed line: Estimated angle (spectrum peak)

**Linear Spectrum:**
- Blue curve: MUSIC spectrum in dB
- Green vertical: True angle
- Red dashed vertical: Estimated angle

**Error Categories:**
- Green (<20°): Good
- Orange (20-45°): Moderate  
- Red (>45°): Poor

---

### Running as Standalone App

```bash
cd notebooks
panel serve 19_interactive_doa_explorer.ipynb --show
```