Hi Pedro,
"detecing drones" feels like an overall complicated problem, and we can't know what you
mean by "nothing is working", I'm afraid!
I see you have multiple out-of-tree blocks (some or all of which you have written
yourself?) in your flow graph – cool!
So, go ahead and make sure they work individually. Especially, build something that allows
you to test your "Energy Detector" block without an SDR, in pure software!
I haven't done an intense reading of your Python code, but there's definitely a syntax
error in there – so you really need to make sure all things work in isolation before
plugging them together.
Best,
Marcus
On 10/16/25 4:13 PM, Pedro Tapia wrote:
> hi everyone, i am trying to use gnuradio for drone detection but nothing is working, if
> anyone can help import numpy as np
> from gnuradio import gr
> import pmt
> from threading import Timer, Lock
>
> class freq_scanner(gr.sync_block):
> def __init__(self, start_freq=2.4e9, stop_freq=2.5e9, step_freq=5e6, dwell_time=0.5):
> gr.sync_block.__init__(self,
> name="Frequency Scanner",
> in_sig=None,
> out_sig=None) # Não precisamos mais da saída de sinal
>
> self.message_port_register_out(pmt.intern("cmd"))
>
> self._start_freq = float(start_freq)
> self._stop_freq = float(stop_freq)
> self._step_freq = float(step_freq)
> self._dwell_time = float(dwell_time)
> self.current_freq = self._start_freq
> # --- A "Caixa de Correio" agora guarda apenas um número (float) ---
> self.freq_to_send = None
> self.lock = Lock()
>
> # Iniciamos o timer
> self.timer = Timer(self._dwell_time, self.prepare_next_freq)
> self.timer.start()
>
> def prepare_next_freq(self):
> """Esta função é chamada pelo timer e APENAS CALCULA a frequência."""
> next_freq = self.current_freq + self._step_freq
> if next_freq > self._stop_freq:
> next_freq = self._start_freq
> self.current_freq = next_freq
> print(f"Timer preparou! Próxima Freq: {self.current_freq / 1e6:.2f} MHz", flush=True)
> # Coloca APENAS o número float na caixa de correio
> with self.lock:
> self.freq_to_send = self.current_freq
>
> # Reinicia o timer
> self.timer = Timer(self._dwell_time, self.prepare_next_freq)
> self.timer.start()
>
> def stop(self):
> self.timer.cancel()
> return True
>
> def work(self, input_items, output_items):
> """A função work agora faz TODO o trabalho de PMT e publicação."""
> local_freq_to_send = None
> # Pega o número da caixa de correio
> with self.lock:
> if self.freq_to_send is not None:
> local_freq_to_send = self.freq_to_send
> self.freq_to_send = None # Esvazia a caixa
>
> # Se havia um número para enviar...
> if local_freq_to_send is not None:
> # ...Cria o dicionário PMT AQUI, no thread principal...
> msg_dict = pmt.make_dict()
> msg_dict = pmt.dict_add(msg_dict, pmt.intern("freq"),
> pmt.from_double(local_freq_to_send))
> # ...e publica AQUI, no thread principal.
> self.message_port_pub(pmt.intern("cmd"), msg_dict)
> return 0 import numpy as np
> from gnuradio import gr
> import pmt
> import scipy.special as scs
> from threading import Lock
>
> class Energy_Detector(gr.basic_block):
> def __init__(self, samples=1024, Pfa=0.01):
> self.samples = int(samples)
> self.Pfa = float(Pfa)
>
> gr.basic_block.__init__(self,
> name="Energy_Detector",
> in_sig=[(np.float32, self.samples), (np.float32, self.samples)],
> out_sig=None)
>
> self.message_port_register_out(pmt.intern("detection_in"))
> self.message_port_register_in(pmt.intern("freq_in"))
> # CORREÇÃO 1: Nome da função corrigido
> self.set_msg_handler(pmt.intern("freq_in"), self.handle_freq_msg)
> self.current_center_freq = 0.0 # Inicializa a frequência
> self.lock = Lock()
>
> def handle_freq_msg(self, msg):
> if pmt.is_dict(msg):
> freq_val = pmt.dict_ref(msg, pmt.intern("freq"), pmt.PMT_NIL)
> if not pmt.is_nil(freq_val):
> with self.lock:
> self.current_center_freq = pmt.to_double(freq_val)
>
> def general_work(self, input_items, output_items):
> signal_energy_vector = input_items[0][0]
> noise_vector = input_items[1][0]
>
> signalAvg = np.mean(signal_energy_vector)
> NoisePower = noise_vector ** 2
> NoiseAvg = np.mean(NoisePower)
> var = np.var(NoisePower)
> stdev = np.sqrt(var)
>
> Qinv = np.sqrt(2) * scs.erfinv(1 - 2 * self.Pfa)
> Threshold = NoiseAvg + Qinv * stdev
>
> # CORREÇÃO 3: Lógica de detecção corrigida
> if signalAvg > Threshold:
> local_center_freq = 0.0
> with self.lock:
> local_center_freq = self.current_center_freq
> # Só envia a mensagem se a frequência for válida (maior que zero)
> if local_center_freq > 0:
> msg_tuple = pmt.make_tuple(
> pmt.from_float(float(signalAvg)),
> pmt.from_float(1.0),
> pmt.from_double(local_center_freq)
> )
> self.message_port_pub(pmt.intern("detection_in"), msg_tuple)
>
> # CORREÇÃO 2: Consumo explícito das portas
> self.consume(0, 1)
> self.consume(1, 1)
> return 0 freq_hz = pmt.to_double(pmt.tuple_ref(msg, 2))
> freq_mhz = freq_hz / 1e6
>
> # Imprime o alerta formatado no terminal
> print("="*40)
> print("!!! ALERTA: SINAL DE DRONE DETECTADO !!!")
> print(f" Frequência: {freq_mhz:.2f} MHz")
> print(f" Energia do Sinal: {energy:.4f}")
> print("="*40, flush=True) # flush=True para garantir a impressão imediata
>
> except Exception as e:
> print(f"Alert Formatter - Erro ao processar mensagem: {e}")
> print(f"Mensagem recebida: {msg}")
>
> def work(self, input_items, output_items):
> # Este bloco não processa sinais de streaming, apenas mensagens.
> # Portanto, a função work não faz nada.
> return 0
No comments:
Post a Comment