Skip to content

Commit

Permalink
improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
dpacheconr committed Nov 20, 2023
1 parent 0fcc0b7 commit 165ae61
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 88 deletions.
Binary file modified .DS_Store
Binary file not shown.
53 changes: 36 additions & 17 deletions src/custom_parser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import time
from pyrfc3339 import parse
import os
import ast
import json
from fastcore.xtras import obj2dict

Expand Down Expand Up @@ -36,7 +35,7 @@ def check_env_vars():
print(key + " not set")
exit(1)

def parse_attributes(obj,att_to_drop):
def parse_attributes(obj,att_to_drop,otype):
obj_atts = {}
attributes_to_drop = []
# todo
Expand All @@ -48,12 +47,14 @@ def parse_attributes(obj,att_to_drop):
# attributes_to_drop.append(attribute)
# except:
# print("Unable to parse GHA_ATTRIBUTES_DROP, check your configuration")

for attribute in list(obj):
attribute_name = str(attribute).lower()
if attribute_name.endswith("_at"):
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(obj[attribute])
if obj['conclusion'] == 'skipped' or obj['conclusion'] == 'cancelled':
pass
else:
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(obj[attribute])

if attribute_name not in attributes_to_drop:
if do_parse(obj[attribute]):
Expand All @@ -67,8 +68,11 @@ def parse_attributes(obj,att_to_drop):
if attribute_name not in attributes_to_drop:
obj_atts[attribute_name]=str(obj[attribute][sub_att][att])
if attribute_name.endswith("_at"):
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(obj[attribute][sub_att][att])
if obj['conclusion'] == 'skipped' or obj['conclusion'] == 'cancelled':
pass
else:
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(obj[attribute][sub_att][att])


elif type(obj[attribute][sub_att]) is list:
Expand All @@ -80,24 +84,33 @@ def parse_attributes(obj,att_to_drop):
if attribute_name not in attributes_to_drop:
obj_atts[attribute_name]=str(key[att])
if attribute_name.endswith("_at"):
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(key[att])
if obj['conclusion'] == 'skipped' or obj['conclusion'] == 'cancelled':
pass
else:
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(key[att])

else:
attribute_name = do_string(attribute)+"."+do_string(sub_att)
if attribute_name not in attributes_to_drop:
obj_atts[attribute_name]=str(key)
if attribute_name.endswith("_at"):
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(key)
if obj['conclusion'] == 'skipped' or obj['conclusion'] == 'cancelled':
pass
else:
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(key)

else:
attribute_name = do_string(attribute)+"."+do_string(sub_att)
if attribute_name not in attributes_to_drop:
obj_atts[attribute_name]=str(obj[attribute][sub_att])
if attribute_name.endswith("_at"):
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(obj[attribute][sub_att])
if obj['conclusion'] == 'skipped' or obj['conclusion'] == 'cancelled':
pass
else:
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(obj[attribute][sub_att])

elif type(obj[attribute]) is list:
for key in obj[attribute]:
Expand All @@ -108,14 +121,20 @@ def parse_attributes(obj,att_to_drop):
if attribute_name not in attributes_to_drop:
obj_atts[attribute_name]=str(key[att])
if attribute_name.endswith("_at"):
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(key[att])
if obj['conclusion'] == 'skipped' or obj['conclusion'] == 'cancelled':
pass
else:
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(key[att])
else:
if do_parse(obj[attribute]):
attribute_name = do_string(attribute)
if attribute_name not in attributes_to_drop:
obj_atts[attribute_name]=str(obj[attribute])
if attribute_name.endswith("_at"):
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(obj[attribute])
if obj['conclusion'] == 'skipped' or obj['conclusion'] == 'cancelled':
pass
else:
new_Att_name=attribute_name+"_ms"
obj_atts[new_Att_name]=do_time_ms(obj[attribute])
return obj_atts
163 changes: 92 additions & 71 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@
GHA_RUN_NAME=os.getenv('GHA_RUN_NAME')
GITHUB_API_URL=os.getenv('GITHUB_API_URL')

# Check if debug is set
if "GHA_DEBUG" in os.environ and os.getenv('GHA_DEBUG').lower() == "true":
print("Running on DEBUG mode")
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
LoggingInstrumentor().instrument(set_logging_format=True,log_level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
pass

if NEW_RELIC_LICENSE_KEY.startswith("eu"):
OTEL_EXPORTER_OTEL_ENDPOINT = "https://otlp.eu01.nr-data.net:4318"
else:
Expand All @@ -47,10 +57,6 @@
"github.resource.type": "span"
}

LoggingInstrumentor().instrument()
# debug otel use configuration below
# LoggingInstrumentor().instrument(set_logging_format=True,log_level=logging.DEBUG)
# logging.basicConfig(filename="exporter.log",level=logging.DEBUG)

#Set workflow level tracer and logger
global_resource = Resource(attributes=global_attributes)
Expand All @@ -70,7 +76,8 @@

# Trace parent
workflow_run_atts = json.loads(get_workflow_run_by_run_id)
atts=parse_attributes(workflow_run_atts,"")
atts=parse_attributes(workflow_run_atts,"","workflow")
print("Processing Workflow ->",GHA_RUN_NAME,"run id ->",GHA_RUN_ID)
p_parent = tracer.start_span(name=str(GHA_RUN_NAME) + " - run: "+str(GHA_RUN_ID),attributes=atts,start_time=do_time(workflow_run_atts['run_started_at']),kind=trace.SpanKind.SERVER)

# Download logs
Expand All @@ -94,74 +101,88 @@
# Set Jobs tracer and logger
pcontext = trace.set_span_in_context(p_parent)
for job in job_lst:
child_0 = tracer.start_span(name=str(job['name']),context=pcontext,start_time=do_time(job['started_at']), kind=trace.SpanKind.CONSUMER)
child_0.set_attributes(create_resource_attributes(parse_attributes(job,"steps"),GHA_SERVICE_NAME))
p_sub_context = trace.set_span_in_context(child_0)

# Steps trace span
for index,step in enumerate(job['steps']):
# Set steps tracer and logger
resource_attributes ={SERVICE_NAME: GHA_SERVICE_NAME,"github.source": "github-exporter","github.resource.type": "span","workflow_run_id": GHA_RUN_ID}
resource_log = Resource(attributes=resource_attributes)
step_tracer = get_tracer(endpoint, headers, resource_log, "step_tracer")

resource_attributes.update(create_resource_attributes(parse_attributes(step,""),GHA_SERVICE_NAME))
resource_log = Resource(attributes=resource_attributes)
job_logger = get_logger(endpoint,headers,resource_log, "job_logger")

if step['conclusion'] == 'skipped' or step['conclusion'] == 'cancelled':
if index >= 1:
# Start time should be the previous step end time
step_started_at=job['steps'][index - 1]['completed_at']
else:
step_started_at=job['started_at']
else:
step_started_at=step['started_at']

child_1 = step_tracer.start_span(name=str(step['name']),start_time=do_time(step_started_at),context=p_sub_context,kind=trace.SpanKind.CONSUMER)
child_1.set_attributes(create_resource_attributes(parse_attributes(step,""),GHA_SERVICE_NAME))
with trace.use_span(child_1, end_on_exit=False):
# Parse logs
try:
print("Processing job ->",job['name'])
child_0 = tracer.start_span(name=str(job['name']),context=pcontext,start_time=do_time(job['started_at']), kind=trace.SpanKind.CONSUMER)
child_0.set_attributes(create_resource_attributes(parse_attributes(job,"steps","job"),GHA_SERVICE_NAME))
p_sub_context = trace.set_span_in_context(child_0)

# Steps trace span
for index,step in enumerate(job['steps']):
try:
with open ("./logs/"+str(job["name"])+"/"+str(step['number'])+"_"+str(step['name'].replace("/",""))+".txt") as f:
for line in f.readlines():
try:
line_to_add = line[29:-1].strip()
len_line_to_add = len(line_to_add)
timestamp_to_add = line[0:23]
if len_line_to_add > 0:
if line_to_add.lower().startswith("##[error]"):
child_1.set_status(Status(StatusCode.ERROR,line_to_add[9:]))
child_0.set_status(Status(StatusCode.ERROR,"STEP: "+str(step['name'])+" failed"))
# Convert ISO 8601 to timestamp
try:
parsed_t = dp.isoparse(timestamp_to_add)
except ValueError as e:
print("Line does not start with a date. Skip for now")
continue
unix_timestamp = parsed_t.timestamp()*1000
job_logger._log(level=logging.INFO,msg=line_to_add,extra={"log.timestamp":unix_timestamp,"log.time":timestamp_to_add},args="")
except Exception as e:
print("Error exporting log line ERROR: ", e)
except IOError as e:
if step['conclusion'] != 'skipped' or step['conclusion'] != 'cancelled':
print("ERROR: Log file does not exist: "+str(job["name"])+"/"+str(step['number'])+"_"+str(step['name'].replace("/",""))+".txt")
print("Processing step ->",step['name'],"from job",job['name'])
# Set steps tracer and logger
resource_attributes ={SERVICE_NAME: GHA_SERVICE_NAME,"github.source": "github-exporter","github.resource.type": "span","workflow_run_id": GHA_RUN_ID}
resource_log = Resource(attributes=resource_attributes)
step_tracer = get_tracer(endpoint, headers, resource_log, "step_tracer")

resource_attributes.update(create_resource_attributes(parse_attributes(step,"","step"),GHA_SERVICE_NAME))
resource_log = Resource(attributes=resource_attributes)
job_logger = get_logger(endpoint,headers,resource_log, "job_logger")

if step['conclusion'] == 'skipped' or step['conclusion'] == 'cancelled':
if index >= 1:
# Start time should be the previous step end time
step_started_at=job['steps'][index - 1]['completed_at']
else:
step_started_at=job['started_at']
else:
pass #We expect log file to not exist

if step['conclusion'] == 'skipped' or step['conclusion'] == 'cancelled':
child_1.update_name(name=str(step['name']+"-SKIPPED"))
if index >= 1:
#End time should be the previous step end time
step_completed_at=job['steps'][index - 1]['completed_at']
else:
step_completed_at=job['started_at']
else:
step_completed_at=step['completed_at']
step_started_at=step['started_at']

child_1 = step_tracer.start_span(name=str(step['name']),start_time=do_time(step_started_at),context=p_sub_context,kind=trace.SpanKind.CONSUMER)
child_1.set_attributes(create_resource_attributes(parse_attributes(step,"","job"),GHA_SERVICE_NAME))
with trace.use_span(child_1, end_on_exit=False):
# Parse logs
try:
with open ("./logs/"+str(job["name"])+"/"+str(step['number'])+"_"+str(step['name'].replace("/",""))+".txt") as f:
for line in f.readlines():
try:
line_to_add = line[29:-1].strip()
len_line_to_add = len(line_to_add)
timestamp_to_add = line[0:23]
if len_line_to_add > 0:
if line_to_add.lower().startswith("##[error]"):
child_1.set_status(Status(StatusCode.ERROR,line_to_add[9:]))
child_0.set_status(Status(StatusCode.ERROR,"STEP: "+str(step['name'])+" failed"))
# Convert ISO 8601 to timestamp
try:
parsed_t = dp.isoparse(timestamp_to_add)
except ValueError as e:
print("Line does not start with a date. Skip for now")
continue
unix_timestamp = parsed_t.timestamp()*1000
job_logger._log(level=logging.INFO,msg=line_to_add,extra={"log.timestamp":unix_timestamp,"log.time":timestamp_to_add},args="")
except Exception as e:
print("Error exporting log line ERROR: ", e)
except IOError as e:
if step['conclusion'] == 'skipped' or step['conclusion'] == 'cancelled':
print("Log file not expected for this step ->",step['name'],"<- because its status is ->",step['conclusion'])
pass #We don't expect log file to exist
else:
print("ERROR: Log file does not exist: "+str(job["name"])+"/"+str(step['number'])+"_"+str(step['name'].replace("/",""))+".txt")

child_1.end(end_time=do_time(step_completed_at))
child_0.end(end_time=do_time(job['completed_at']))
workflow_run_finish_time=do_time(job['completed_at'])
p_parent.end(end_time=workflow_run_finish_time)

if step['conclusion'] == 'skipped' or step['conclusion'] == 'cancelled':
child_1.update_name(name=str(step['name']+"-SKIPPED"))
if index >= 1:
#End time should be the previous step end time
step_completed_at=job['steps'][index - 1]['completed_at']
else:
step_completed_at=job['started_at']
else:
step_completed_at=step['completed_at']

child_1.end(end_time=do_time(step_completed_at))
print("Finished processing step ->",step['name'],"from job",job['name'])
except Exception as e:
print("Unable to process step ->",step['name'],"<- due to error",e)

child_0.end(end_time=do_time(job['completed_at']))
workflow_run_finish_time=do_time(job['completed_at'])
print("Finished processing job ->",job['name'])
except Exception as e:
print("Unable to process job ->",job['name'],"<- due to error",e)

p_parent.end(end_time=workflow_run_finish_time)
print("Finished processing Workflow ->",GHA_RUN_NAME,"run id ->",GHA_RUN_ID)
print("All data exported to New Relic")

0 comments on commit 165ae61

Please sign in to comment.