Skip to content

Commit

Permalink
add tests for asm masking and threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
lofaldli committed Jun 1, 2018
1 parent 1865e6f commit b70cb0a
Showing 1 changed file with 61 additions and 2 deletions.
63 changes: 61 additions & 2 deletions python/qa_correlator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_001_t (self):
self.tb.start()

while dbg.num_messages() < 1:
time.sleep(0.1)
time.sleep(0.001)

self.tb.stop()
self.tb.wait()
Expand Down Expand Up @@ -78,7 +78,7 @@ def test_002_inverted_bits (self):
self.tb.start()

while dbg.num_messages() < 1:
time.sleep(0.1)
time.sleep(0.001)

self.tb.stop()
self.tb.wait()
Expand All @@ -89,6 +89,65 @@ def test_002_inverted_bits (self):
assert frame_len == len(data_out)
assert random_data == data_out

def test_003_asm_mask (self):
# asm with errors
asm = (0x1f, 0xcf, 0xf0, 0x1d)
frame_len = 223
random_data = tuple(random.randint(0, 255) for _ in range(frame_len))

data_in = asm + random_data

src = blocks.vector_source_b(data_in, repeat=True)
unpack = blocks.unpack_k_bits_bb(8)
mapper = digital.map_bb((1,0))
# mask to ignore errors
corr = ccsds.correlator(0x1acffc1d, 0xf0fff0ff, 0, frame_len)
dbg = blocks.message_debug()
self.tb.connect(src, unpack, mapper, corr)
self.tb.msg_connect((corr, 'out'), (dbg, 'store'))
self.tb.start()

while dbg.num_messages() < 1:
time.sleep(0.001)

self.tb.stop()
self.tb.wait()

msg = dbg.get_message(0)
data_out = tuple(pmt.to_python(pmt.cdr(msg)))

assert frame_len == len(data_out)
assert random_data == data_out

def test_004_threshold (self):
# asm with a few bits wrong
asm = (0x3a, 0xcf, 0xec, 0x1d)
frame_len = 223
random_data = tuple(random.randint(0, 255) for _ in range(frame_len))

data_in = asm + random_data

src = blocks.vector_source_b(data_in, repeat=True)
unpack = blocks.unpack_k_bits_bb(8)
mapper = digital.map_bb((1,0))
# threshold set to 2
corr = ccsds.correlator(0x1acffc1d, 0xffffffff, 2, frame_len)
dbg = blocks.message_debug()
self.tb.connect(src, unpack, mapper, corr)
self.tb.msg_connect((corr, 'out'), (dbg, 'store'))
self.tb.start()

while dbg.num_messages() < 1:
time.sleep(0.001)

self.tb.stop()
self.tb.wait()

msg = dbg.get_message(0)
data_out = tuple(pmt.to_python(pmt.cdr(msg)))

assert frame_len == len(data_out)
assert random_data == data_out

if __name__ == '__main__':
gr_unittest.run(qa_correlator, "qa_correlator.xml")

0 comments on commit b70cb0a

Please sign in to comment.