The work() functions are for stream handling. You may have already readThe approach here should be to create a separate thread in your init, and have that thread periodically public a message. There is a Message Strobe block that does this (in C++) and you could take a look at that code as an example.On Mon, Nov 29, 2021 at 10:06 AM Michelle <mkprojects731@gmail.com> wrote:Good morning,
I want to generate a source controlled by voltage. Basically, I send messages to a source in order to change its amplitude. I have built a block to generate messages and pass them to the source. To do so, I created an "embedded python block " that generates and transmits a message every second, here is the python code of my block:
from gnuradio import gr, blocks
import pmt
import numpy
import time
#The block receives input data (the amplitude of a signal) then generates a message to be passed to a constant source so that the latter modifies its amplitude"
class message_generator(gr.sync_block):
def __init__(self):
gr.sync_block.__init__(
self,
name="message generator",
in_sig=[numpy.float32], #amplitude that the signal must have
out_sig=None
)
self.message_port_register_out(pmt.intern('out_port'))
def work(self, input_items, output_items):
self.msg_port_pub(pmt.intern('out_port'), pmt.cons(pmt.intern("ampl"), pmt.from_double(input_items[0][0])))
time.sleep(1)
return 1
when I execute my flowgraph in gnuradio I have the error : self.msg_port_pub(pmt.intern('out_port'), pmt.cons(pmt.intern("ampl"), pmt.from_double(input_items[0][0])))
AttributeError: 'message_generator' object has no attribute 'msg_port_pub' . And I can't understand why if in the qa_python_message_passing.py (below ) the function message_port_pub is called the same way as I do in mine.
class message_generator(gr.sync_block):
def __init__(self, msg_interval = 10):
gr.sync_block.__init__(
self,
name="message generator",
in_sig=[numpy.float32], #amplitude that the signal must have
out_sig=None
)
self.msg_list = []
#self.message_port_register_in(pmt.intern('in_port'))
self.message_port_register_out(pmt.intern('out_port'))
self.msg_interval = msg_interval
self.msg_ctr = 0
def work(self, input_items, output_items):
inLen = len(input_items[0])
# Create a new PMT for each input value and put it in the message list
self.msg_list.append(pmt.from_double(input_items[0][0]))
while self.msg_ctr < len(self.msg_list) and \
(self.msg_ctr * self.msg_interval) < \
(self.nitems_read(0) + inLen):
self.message_port_pub(pmt.intern('out_port'),
pmt.cons(pmt.intern("ampl"),self.msg_list[self.msg_ctr]))
self.msg_ctr += 1
return inLen
PLease can you explain to me what is wrong on my code? I even tried to put an attribute "msg_port_pub" when declaring the class but I still have the same error.
from gnuradio import gr, blocks
import pmt
import numpy
import time
class message_generator(gr.sync_block):
def __init__(self):
gr.sync_block.__init__(
self,
name="message generator",
in_sig=[numpy.float32], #amplitude that the signal must have
out_sig=None
)
d_port= self.message_port_register_out(pmt.intern('out_port'))
self.msg_port_pub(d_port, self.buildMessage)
def buildMessage(self, data):
msg = pmt.cons(pmt.intern("ampl"), pmt.from_double(data))
return msg
def work(self, input_items, output_items):
messageToSend = builMessage(input_items=[0][0])
self.msg_port_pub(d_port, messageToSend)
time.sleep(1)
return 1
You can find attached my flowgraph and the resulting python code.
Thank you in advance and have a good day.
Read the mailing list of the GNU project right here! The information here is regarding the GNU radio project for USRP radios.
Monday, November 29, 2021
Re: Problems when passing messages to a source block
Minor correction - as is done in Message Strobe, you don't want the thread to start running until start() is called. You can create the thread in init() or start(). The Message Strobe creates its thread in start().
On Mon, Nov 29, 2021 at 11:07 AM Jeff Long <willcode4@gmail.com> wrote:
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment