Embedded Python Blocks:
Each time this file is saved, GRC will instantiate the first class it finds
to get ports and parameters of your block. The arguments to __init__ will
be the parameters. All of them are required to have default values!
"""
import numpy as np
from gnuradio import gr
import pmt
class blk(gr.sync_block): # other base classes are basic_block, decim_block, interp_block
"""
Embedded Python Block
Function changing periodically the signal source's frequency, from
f_min to f_max, by step of BW_per_channel [i.e. (f_max-f_min)/(n_channels-1)]
samp_rate ==> samp_rate values per signal period
sweep_rate ==> sweep_rate cycling per second
Means we stay samp_rate/sweep_rate values in each channel
[N.B.: n_channels ==> samp_rate/sweep_rate/n_channels in each channel]
"""
def __init__(self, n_channels=3, samp_rate=32e3, sweep_rate=1, f_start=2.4e3, f_stop=2.5e3): # only default arguments here
"""arguments to this function show up as parameters in GRC"""
gr.sync_block.__init__(
self,
name='Frequency sweeper', # will show up in GRC
in_sig=[np.float32],
out_sig=[]
)
# if an attribute with the same name as a parameter is found,
# a callback is registered (properties work, too).
self.n_channels = n_channels
self.samp_rate = samp_rate
self.sweep_rate = sweep_rate
self.f_start = f_start
self.f_stop = f_stop
self.BW_tot = self.f_stop - f_start
self.frequencies = [self.f_start + i*self.BW_tot/(self.n_channels-1) for i in range(self.n_channels)]
#self.t_per_freq = self.samp_rate/self.sweep_rate/(self.n_channels - 1)
self.counter = 0 # if bigger than given threshold, update the frequency
self.i = 0 # which frequency to select
self.t_per_freq = self.samp_rate/self.sweep_rate/(self.n_channels - 1)
self.outPortFreq = "frequency"
self.message_port_register_out(pmt.intern(self.outPortFreq))
self.outPortDebugMsg = "debug"
self.message_port_register_out(pmt.intern(self.outPortDebugMsg))
def work(self, input_items, output_items):
"""
Example: change frequency of signal source periodically
samp_rate ==> samp_rate values per signal period
sweep_rate ==> sweep_rate cycling per second
Means we stay samp_rate/sweep_rate values in each channel
[N.B.: n_channels ==> samp_rate/sweep_rate/n_channels in each channel]
"""
# Some debug message printing the value of variable "t_per_freq"
pmt_msg_debug = pmt.dict_add(pmt.make_dict(), pmt.intern("t_per_freq"), pmt.from_double(self.t_per_freq))
self.message_port_pub(pmt.intern(self.outPortDebugMsg), pmt_msg_debug)
self.counter = self.counter + len(input_items[0])
# Some debug message printing the value of variable "counter"
pmt_msg_debug = pmt.dict_add(pmt.make_dict(), pmt.intern("counter"), pmt.from_double(self.counter))
self.message_port_pub(pmt.intern(self.outPortDebugMsg), pmt_msg_debug)
if self.counter > self.t_per_freq:
self.i += 1
self.counter = 0
if self.i > len(self.frequencies)-1: self.i = 0
# Message sent to update the source frequency
pmt_msg = pmt.dict_add(pmt.make_dict(), pmt.intern("freq"), pmt.from_double(self.frequencies[self.i]))
self.message_port_pub(pmt.intern(self.outPortFreq), pmt_msg)
# Some debug message printing the value of variable "freq"
pmt_msg_debug = pmt.dict_add(pmt.make_dict(), pmt.intern("freq"), pmt.from_double(self.frequencies[self.i]))
self.message_port_pub(pmt.intern(self.outPortDebugMsg), pmt_msg_debug)
return len(input_items[0])
#blocks.message_strobe(pmt.from_float(300), int(10000*samp_rate/sweep_rate/n_channels))

No comments:
Post a Comment