Hi everyone,
I'm working on a cognitive radio demonstrator as part of my bachelor thesis and I'm currently facing issues with overflows and underflows in my GNU Radio flowgraph when using a USRP device.
I've attached screenshots of my flowgraph for simulations:
I think the issue might be related to my OOT blocks. The problem is that I need to dynamically set the occupied carriers, but with the existing GNU Radio blocks, I couldn't find a way to do it — so I created a custom block for that purpose.
However, due to latency between the two inputs of the allocator block, it ends up crashing. I modified my code to use buffers, but the performance is quite slow.
Does anyone have suggestions on how to address this issue, or any ideas on how to dynamically set the number of subcarriers in the allocator with an existing block?:
Thanks in advance.
Here is the code of the allocator block:
import numpy as np
from collections import deque
from gnuradio import gr
class Allocator(gr.sync_block):
"""
Allocator block optimized for performance
"""
def __init__(self, mask_len=128):
gr.sync_block.__init__(self,
name="Allocator",
in_sig=[np.complex64, (np.float32, mask_len)],
out_sig=[(np.complex64, mask_len)]
)
self.mask_len = mask_len
self.data_buffer = np.zeros(0, dtype=np.complex64)
self.mask_buffer = deque()
def work(self, input_items, output_items):
in_data = input_items[0]
in_mask = input_items[1]
out = output_items[0]
n_symbols = min(len(in_mask), len(out))
self.data_buffer = np.append(self.data_buffer, in_data)
self.mask_buffer.extend(in_mask)
symbols_written = 0
data_idx = 0
for i in range(n_symbols):
if len(self.mask_buffer) == 0:
break
mask = self.mask_buffer.popleft()
if len(mask) != self.mask_len:
raise ValueError(f"error mask len")
occupied_carriers = np.where(mask < 0.5)[0]
num_occupied = len(occupied_carriers)
if data_idx + num_occupied > len(self.data_buffer):
break
ofdm_symbol = np.zeros(self.mask_len, dtype=np.complex64)
ofdm_symbol[occupied_carriers] = self.data_buffer[data_idx:data_idx + num_occupied]
data_idx += num_occupied
out[i][:] = ofdm_symbol
symbols_written += 1
self.data_buffer = self.data_buffer[data_idx:]
return symbols_written
No comments:
Post a Comment