hi everyone, i am trying to use gnuradio for drone detection but nothing is working, if anyone can help import numpy as np
from gnuradio import gr
import pmt
from threading import Timer, Lock
class freq_scanner(gr.sync_block):
def __init__(self, start_freq=2.4e9, stop_freq=2.5e9, step_freq=5e6, dwell_time=0.5):
gr.sync_block.__init__(self,
name="Frequency Scanner",
in_sig=None,
out_sig=None) # Não precisamos mais da saída de sinal
self.message_port_register_out(pmt.intern("cmd"))
self._start_freq = float(start_freq)
self._stop_freq = float(stop_freq)
self._step_freq = float(step_freq)
self._dwell_time = float(dwell_time)
self.current_freq = self._start_freq
# --- A "Caixa de Correio" agora guarda apenas um número (float) ---
self.freq_to_send = None
self.lock = Lock()
# Iniciamos o timer
self.timer = Timer(self._dwell_time, self.prepare_next_freq)
self.timer.start()
def prepare_next_freq(self):
"""Esta função é chamada pelo timer e APENAS CALCULA a frequência."""
next_freq = self.current_freq + self._step_freq
if next_freq > self._stop_freq:
next_freq = self._start_freq
self.current_freq = next_freq
print(f"Timer preparou! Próxima Freq: {self.current_freq / 1e6:.2f} MHz", flush=True)
# Coloca APENAS o número float na caixa de correio
with self.lock:
self.freq_to_send = self.current_freq
# Reinicia o timer
self.timer = Timer(self._dwell_time, self.prepare_next_freq)
self.timer.start()
def stop(self):
self.timer.cancel()
return True
def work(self, input_items, output_items):
"""A função work agora faz TODO o trabalho de PMT e publicação."""
local_freq_to_send = None
# Pega o número da caixa de correio
with self.lock:
if self.freq_to_send is not None:
local_freq_to_send = self.freq_to_send
self.freq_to_send = None # Esvazia a caixa
# Se havia um número para enviar...
if local_freq_to_send is not None:
# ...Cria o dicionário PMT AQUI, no thread principal...
msg_dict = pmt.make_dict()
msg_dict = pmt.dict_add(msg_dict, pmt.intern("freq"), pmt.from_double(local_freq_to_send))
# ...e publica AQUI, no thread principal.
self.message_port_pub(pmt.intern("cmd"), msg_dict)
return 0 import numpy as np
from gnuradio import gr
import pmt
import scipy.special as scs
from threading import Lock
class Energy_Detector(gr.basic_block):
def __init__(self, samples=1024, Pfa=0.01):
self.samples = int(samples)
self.Pfa = float(Pfa)
gr.basic_block.__init__(self,
name="Energy_Detector",
in_sig=[(np.float32, self.samples), (np.float32, self.samples)],
out_sig=None)
self.message_port_register_out(pmt.intern("detection_in"))
self.message_port_register_in(pmt.intern("freq_in"))
# CORREÇÃO 1: Nome da função corrigido
self.set_msg_handler(pmt.intern("freq_in"), self.handle_freq_msg)
self.current_center_freq = 0.0 # Inicializa a frequência
self.lock = Lock()
def handle_freq_msg(self, msg):
if pmt.is_dict(msg):
freq_val = pmt.dict_ref(msg, pmt.intern("freq"), pmt.PMT_NIL)
if not pmt.is_nil(freq_val):
with self.lock:
self.current_center_freq = pmt.to_double(freq_val)
def general_work(self, input_items, output_items):
signal_energy_vector = input_items[0][0]
noise_vector = input_items[1][0]
signalAvg = np.mean(signal_energy_vector)
NoisePower = noise_vector ** 2
NoiseAvg = np.mean(NoisePower)
var = np.var(NoisePower)
stdev = np.sqrt(var)
Qinv = np.sqrt(2) * scs.erfinv(1 - 2 * self.Pfa)
Threshold = NoiseAvg + Qinv * stdev
# CORREÇÃO 3: Lógica de detecção corrigida
if signalAvg > Threshold:
local_center_freq = 0.0
with self.lock:
local_center_freq = self.current_center_freq
# Só envia a mensagem se a frequência for válida (maior que zero)
if local_center_freq > 0:
msg_tuple = pmt.make_tuple(
pmt.from_float(float(signalAvg)),
pmt.from_float(1.0),
pmt.from_double(local_center_freq)
)
self.message_port_pub(pmt.intern("detection_in"), msg_tuple)
# CORREÇÃO 2: Consumo explícito das portas
self.consume(0, 1)
self.consume(1, 1)
return 0 freq_hz = pmt.to_double(pmt.tuple_ref(msg, 2))
freq_mhz = freq_hz / 1e6
# Imprime o alerta formatado no terminal
print("="*40)
print("!!! ALERTA: SINAL DE DRONE DETECTADO !!!")
print(f" Frequência: {freq_mhz:.2f} MHz")
print(f" Energia do Sinal: {energy:.4f}")
print("="*40, flush=True) # flush=True para garantir a impressão imediata
except Exception as e:
print(f"Alert Formatter - Erro ao processar mensagem: {e}")
print(f"Mensagem recebida: {msg}")
def work(self, input_items, output_items):
# Este bloco não processa sinais de streaming, apenas mensagens.
# Portanto, a função work não faz nada.
No comments:
Post a Comment