Thursday, October 16, 2025

Detection of drones

hi everyone, i am trying to use gnuradio for drone detection but nothing is working, if anyone can help import numpy as np
from gnuradio import gr
import pmt
from threading import Timer, Lock

class freq_scanner(gr.sync_block):
    def __init__(self, start_freq=2.4e9, stop_freq=2.5e9, step_freq=5e6, dwell_time=0.5):
        gr.sync_block.__init__(self,
            name="Frequency Scanner",
            in_sig=None,
            out_sig=None) # Não precisamos mais da saída de sinal

        self.message_port_register_out(pmt.intern("cmd"))

        self._start_freq = float(start_freq)
        self._stop_freq = float(stop_freq)
        self._step_freq = float(step_freq)
        self._dwell_time = float(dwell_time)
        
        self.current_freq = self._start_freq
        
        # --- A "Caixa de Correio" agora guarda apenas um número (float) ---
        self.freq_to_send = None
        self.lock = Lock()

        # Iniciamos o timer
        self.timer = Timer(self._dwell_time, self.prepare_next_freq)
        self.timer.start()

    def prepare_next_freq(self):
        """Esta função é chamada pelo timer e APENAS CALCULA a frequência."""
        
        next_freq = self.current_freq + self._step_freq
        if next_freq > self._stop_freq:
            next_freq = self._start_freq
        
        self.current_freq = next_freq
        
        print(f"Timer preparou! Próxima Freq: {self.current_freq / 1e6:.2f} MHz", flush=True)
        
        # Coloca APENAS o número float na caixa de correio
        with self.lock:
            self.freq_to_send = self.current_freq

        # Reinicia o timer
        self.timer = Timer(self._dwell_time, self.prepare_next_freq)
        self.timer.start()

    def stop(self):
        self.timer.cancel()
        return True

    def work(self, input_items, output_items):
        """A função work agora faz TODO o trabalho de PMT e publicação."""
        
        local_freq_to_send = None
        # Pega o número da caixa de correio
        with self.lock:
            if self.freq_to_send is not None:
                local_freq_to_send = self.freq_to_send
                self.freq_to_send = None # Esvazia a caixa

        # Se havia um número para enviar...
        if local_freq_to_send is not None:
            # ...Cria o dicionário PMT AQUI, no thread principal...
            msg_dict = pmt.make_dict()
            msg_dict = pmt.dict_add(msg_dict, pmt.intern("freq"), pmt.from_double(local_freq_to_send))
            
            # ...e publica AQUI, no thread principal.
            self.message_port_pub(pmt.intern("cmd"), msg_dict)
        
        return 0  import numpy as np
from gnuradio import gr
import pmt
import scipy.special as scs
from threading import Lock

class Energy_Detector(gr.basic_block):
    def __init__(self, samples=1024, Pfa=0.01):
        self.samples = int(samples)
        self.Pfa = float(Pfa)

        gr.basic_block.__init__(self,
            name="Energy_Detector",
            in_sig=[(np.float32, self.samples), (np.float32, self.samples)],
            out_sig=None)

        self.message_port_register_out(pmt.intern("detection_in"))
        self.message_port_register_in(pmt.intern("freq_in"))
        # CORREÇÃO 1: Nome da função corrigido
        self.set_msg_handler(pmt.intern("freq_in"), self.handle_freq_msg)
        
        self.current_center_freq = 0.0 # Inicializa a frequência
        self.lock = Lock()

    def handle_freq_msg(self, msg):
        if pmt.is_dict(msg):
            freq_val = pmt.dict_ref(msg, pmt.intern("freq"), pmt.PMT_NIL)
            if not pmt.is_nil(freq_val):
                with self.lock:
                    self.current_center_freq = pmt.to_double(freq_val)

    def general_work(self, input_items, output_items):
        signal_energy_vector = input_items[0][0]
        noise_vector = input_items[1][0]

        signalAvg = np.mean(signal_energy_vector)
        
        NoisePower = noise_vector ** 2
        NoiseAvg = np.mean(NoisePower)
        var = np.var(NoisePower)
        stdev = np.sqrt(var)

        Qinv = np.sqrt(2) * scs.erfinv(1 - 2 * self.Pfa)
        Threshold = NoiseAvg + Qinv * stdev

        # CORREÇÃO 3: Lógica de detecção corrigida
        if signalAvg > Threshold:
            local_center_freq = 0.0
            with self.lock:
                local_center_freq = self.current_center_freq
            
            # Só envia a mensagem se a frequência for válida (maior que zero)
            if local_center_freq > 0:
                msg_tuple = pmt.make_tuple(
                    pmt.from_float(float(signalAvg)),
                    pmt.from_float(1.0),
                    pmt.from_double(local_center_freq)
                )
                self.message_port_pub(pmt.intern("detection_in"), msg_tuple)

        # CORREÇÃO 2: Consumo explícito das portas
        self.consume(0, 1)
        self.consume(1, 1)
        return 0     freq_hz = pmt.to_double(pmt.tuple_ref(msg, 2))
                freq_mhz = freq_hz / 1e6

                # Imprime o alerta formatado no terminal
                print("="*40)
                print("!!! ALERTA: SINAL DE DRONE DETECTADO !!!")
                print(f"    Frequência: {freq_mhz:.2f} MHz")
                print(f"    Energia do Sinal: {energy:.4f}")
                print("="*40, flush=True) # flush=True para garantir a impressão imediata

        except Exception as e:
            print(f"Alert Formatter - Erro ao processar mensagem: {e}")
            print(f"Mensagem recebida: {msg}")

    def work(self, input_items, output_items):
        # Este bloco não processa sinais de streaming, apenas mensagens.
        # Portanto, a função work não faz nada.
        return 0   

No comments:

Post a Comment