Friday, October 17, 2025

Re: Detection of drones

Hi Pedro,

"detecing drones" feels like an overall complicated problem, and we can't know what you
mean by "nothing is working", I'm afraid!

I see you have multiple out-of-tree blocks (some or all of which you have written
yourself?) in your flow graph – cool!

So, go ahead and make sure they work individually. Especially, build something that allows
you to test your "Energy Detector" block without an SDR, in pure software!

I haven't done an intense reading of your Python code, but there's definitely a syntax
error in there – so you really need to make sure all things work in isolation before
plugging them together.

Best,
Marcus


On 10/16/25 4:13 PM, Pedro Tapia wrote:
> hi everyone, i am trying to use gnuradio for drone detection but nothing is working, if
> anyone can help import numpy as np
> from gnuradio import gr
> import pmt
> from threading import Timer, Lock
>
> class freq_scanner(gr.sync_block):
>     def __init__(self, start_freq=2.4e9, stop_freq=2.5e9, step_freq=5e6, dwell_time=0.5):
>         gr.sync_block.__init__(self,
>             name="Frequency Scanner",
>             in_sig=None,
>             out_sig=None) # Não precisamos mais da saída de sinal
>
>         self.message_port_register_out(pmt.intern("cmd"))
>
>         self._start_freq = float(start_freq)
>         self._stop_freq = float(stop_freq)
>         self._step_freq = float(step_freq)
>         self._dwell_time = float(dwell_time)
>         self.current_freq = self._start_freq
>         # --- A "Caixa de Correio" agora guarda apenas um número (float) ---
>         self.freq_to_send = None
>         self.lock = Lock()
>
>         # Iniciamos o timer
>         self.timer = Timer(self._dwell_time, self.prepare_next_freq)
>         self.timer.start()
>
>     def prepare_next_freq(self):
>         """Esta função é chamada pelo timer e APENAS CALCULA a frequência."""
>         next_freq = self.current_freq + self._step_freq
>         if next_freq > self._stop_freq:
>             next_freq = self._start_freq
>         self.current_freq = next_freq
>         print(f"Timer preparou! Próxima Freq: {self.current_freq / 1e6:.2f} MHz", flush=True)
>         # Coloca APENAS o número float na caixa de correio
>         with self.lock:
>             self.freq_to_send = self.current_freq
>
>         # Reinicia o timer
>         self.timer = Timer(self._dwell_time, self.prepare_next_freq)
>         self.timer.start()
>
>     def stop(self):
>         self.timer.cancel()
>         return True
>
>     def work(self, input_items, output_items):
>         """A função work agora faz TODO o trabalho de PMT e publicação."""
>         local_freq_to_send = None
>         # Pega o número da caixa de correio
>         with self.lock:
>             if self.freq_to_send is not None:
>                 local_freq_to_send = self.freq_to_send
>                 self.freq_to_send = None # Esvazia a caixa
>
>         # Se havia um número para enviar...
>         if local_freq_to_send is not None:
>             # ...Cria o dicionário PMT AQUI, no thread principal...
>             msg_dict = pmt.make_dict()
>             msg_dict = pmt.dict_add(msg_dict, pmt.intern("freq"),
> pmt.from_double(local_freq_to_send))
>             # ...e publica AQUI, no thread principal.
>             self.message_port_pub(pmt.intern("cmd"), msg_dict)
>         return 0  import numpy as np
> from gnuradio import gr
> import pmt
> import scipy.special as scs
> from threading import Lock
>
> class Energy_Detector(gr.basic_block):
>     def __init__(self, samples=1024, Pfa=0.01):
>         self.samples = int(samples)
>         self.Pfa = float(Pfa)
>
>         gr.basic_block.__init__(self,
>             name="Energy_Detector",
>             in_sig=[(np.float32, self.samples), (np.float32, self.samples)],
>             out_sig=None)
>
>         self.message_port_register_out(pmt.intern("detection_in"))
>         self.message_port_register_in(pmt.intern("freq_in"))
>         # CORREÇÃO 1: Nome da função corrigido
>         self.set_msg_handler(pmt.intern("freq_in"), self.handle_freq_msg)
>         self.current_center_freq = 0.0 # Inicializa a frequência
>         self.lock = Lock()
>
>     def handle_freq_msg(self, msg):
>         if pmt.is_dict(msg):
>             freq_val = pmt.dict_ref(msg, pmt.intern("freq"), pmt.PMT_NIL)
>             if not pmt.is_nil(freq_val):
>                 with self.lock:
>                     self.current_center_freq = pmt.to_double(freq_val)
>
>     def general_work(self, input_items, output_items):
>         signal_energy_vector = input_items[0][0]
>         noise_vector = input_items[1][0]
>
>         signalAvg = np.mean(signal_energy_vector)
>         NoisePower = noise_vector ** 2
>         NoiseAvg = np.mean(NoisePower)
>         var = np.var(NoisePower)
>         stdev = np.sqrt(var)
>
>         Qinv = np.sqrt(2) * scs.erfinv(1 - 2 * self.Pfa)
>         Threshold = NoiseAvg + Qinv * stdev
>
>         # CORREÇÃO 3: Lógica de detecção corrigida
>         if signalAvg > Threshold:
>             local_center_freq = 0.0
>             with self.lock:
>                 local_center_freq = self.current_center_freq
>             # Só envia a mensagem se a frequência for válida (maior que zero)
>             if local_center_freq > 0:
>                 msg_tuple = pmt.make_tuple(
>                     pmt.from_float(float(signalAvg)),
>                     pmt.from_float(1.0),
>                     pmt.from_double(local_center_freq)
>                 )
>                 self.message_port_pub(pmt.intern("detection_in"), msg_tuple)
>
>         # CORREÇÃO 2: Consumo explícito das portas
>         self.consume(0, 1)
>         self.consume(1, 1)
>         return 0     freq_hz = pmt.to_double(pmt.tuple_ref(msg, 2))
>                 freq_mhz = freq_hz / 1e6
>
>                 # Imprime o alerta formatado no terminal
>                 print("="*40)
>                 print("!!! ALERTA: SINAL DE DRONE DETECTADO !!!")
>                 print(f"    Frequência: {freq_mhz:.2f} MHz")
>                 print(f"    Energia do Sinal: {energy:.4f}")
>                 print("="*40, flush=True) # flush=True para garantir a impressão imediata
>
>         except Exception as e:
>             print(f"Alert Formatter - Erro ao processar mensagem: {e}")
>             print(f"Mensagem recebida: {msg}")
>
>     def work(self, input_items, output_items):
>         # Este bloco não processa sinais de streaming, apenas mensagens.
>         # Portanto, a função work não faz nada.
>         return 0

No comments:

Post a Comment