From b70cb0af71dfd2e217d49852033de5d66cccf853 Mon Sep 17 00:00:00 2001 From: lofaldli Date: Fri, 1 Jun 2018 13:09:15 +0200 Subject: [PATCH] add tests for asm masking and threshold --- python/qa_correlator.py | 63 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/python/qa_correlator.py b/python/qa_correlator.py index 932b24d..aac6ec1 100755 --- a/python/qa_correlator.py +++ b/python/qa_correlator.py @@ -50,7 +50,7 @@ def test_001_t (self): self.tb.start() while dbg.num_messages() < 1: - time.sleep(0.1) + time.sleep(0.001) self.tb.stop() self.tb.wait() @@ -78,7 +78,7 @@ def test_002_inverted_bits (self): self.tb.start() while dbg.num_messages() < 1: - time.sleep(0.1) + time.sleep(0.001) self.tb.stop() self.tb.wait() @@ -89,6 +89,65 @@ def test_002_inverted_bits (self): assert frame_len == len(data_out) assert random_data == data_out + def test_003_asm_mask (self): + # asm with errors + asm = (0x1f, 0xcf, 0xf0, 0x1d) + frame_len = 223 + random_data = tuple(random.randint(0, 255) for _ in range(frame_len)) + + data_in = asm + random_data + + src = blocks.vector_source_b(data_in, repeat=True) + unpack = blocks.unpack_k_bits_bb(8) + mapper = digital.map_bb((1,0)) + # mask to ignore errors + corr = ccsds.correlator(0x1acffc1d, 0xf0fff0ff, 0, frame_len) + dbg = blocks.message_debug() + self.tb.connect(src, unpack, mapper, corr) + self.tb.msg_connect((corr, 'out'), (dbg, 'store')) + self.tb.start() + + while dbg.num_messages() < 1: + time.sleep(0.001) + + self.tb.stop() + self.tb.wait() + + msg = dbg.get_message(0) + data_out = tuple(pmt.to_python(pmt.cdr(msg))) + + assert frame_len == len(data_out) + assert random_data == data_out + + def test_004_threshold (self): + # asm with a few bits wrong + asm = (0x3a, 0xcf, 0xec, 0x1d) + frame_len = 223 + random_data = tuple(random.randint(0, 255) for _ in range(frame_len)) + + data_in = asm + random_data + + src = blocks.vector_source_b(data_in, repeat=True) + unpack = blocks.unpack_k_bits_bb(8) + mapper = digital.map_bb((1,0)) + # threshold set to 2 + corr = ccsds.correlator(0x1acffc1d, 0xffffffff, 2, frame_len) + dbg = blocks.message_debug() + self.tb.connect(src, unpack, mapper, corr) + self.tb.msg_connect((corr, 'out'), (dbg, 'store')) + self.tb.start() + + while dbg.num_messages() < 1: + time.sleep(0.001) + + self.tb.stop() + self.tb.wait() + + msg = dbg.get_message(0) + data_out = tuple(pmt.to_python(pmt.cdr(msg))) + + assert frame_len == len(data_out) + assert random_data == data_out if __name__ == '__main__': gr_unittest.run(qa_correlator, "qa_correlator.xml")