diff --git a/.gitignore b/.gitignore index bca2a5a..add4956 100644 --- a/.gitignore +++ b/.gitignore @@ -319,3 +319,4 @@ $RECYCLE.BIN/ *.pyc /CODE/SysInternal_Suite/.sys.ignore /ACCESS/ +/CODE/VulnScan/tools/NN features/ diff --git a/.idea/Logicytics.iml b/.idea/Logicytics.iml index e33fd63..235b40b 100644 --- a/.idea/Logicytics.iml +++ b/.idea/Logicytics.iml @@ -16,6 +16,8 @@ + + @@ -33,6 +35,7 @@ diff --git a/CODE/Logicytics.py b/CODE/Logicytics.py index a5f5583..381becb 100644 --- a/CODE/Logicytics.py +++ b/CODE/Logicytics.py @@ -16,11 +16,12 @@ # Initialization FileManagement.mkdir() log = Log({"log_level": DEBUG, "delete_log": DELETE_LOGS}) +ACTION = None +SUB_ACTION = None class Health: @staticmethod - @log.function def backup(directory: str, name: str): """ Creates a backup of a specified directory by zipping its contents and moving it to a designated backup location. @@ -47,7 +48,6 @@ def backup(directory: str, name: str): shutil.move(f"{name}.zip", "../ACCESS/BACKUP") @staticmethod - @log.function def update() -> tuple[str, str]: """ Updates the repository by pulling the latest changes from the remote repository. @@ -325,9 +325,13 @@ def threaded_execution(execution_list_thread, index_thread): def zip_generated_files(): """Zips generated files based on the action.""" - - def zip_and_log(directory, name): - zip_values = FileManagement.Zip.and_hash(directory, name, ACTION) + def zip_and_log(directory: str, name: str): + log.debug(f"Zipping directory '{directory}' with name '{name}' under action '{ACTION}'") + zip_values = FileManagement.Zip.and_hash( + directory, + name, + ACTION if ACTION is not None else f"ERROR_NO_ACTION_SPECIFIED_{datetime.now().isoformat()}" + ) if isinstance(zip_values, str): log.error(zip_values) else: diff --git a/CODE/VulnScan/Documentation.md b/CODE/VulnScan/Documentation.md index 4b750fd..7b0f5dc 100644 --- a/CODE/VulnScan/Documentation.md +++ b/CODE/VulnScan/Documentation.md @@ -107,3 +107,32 @@ VulnScan is designed to detect sensitive data across various file formats. It of - **Progress Tracking**: Visualizes accuracy and loss per epoch with graphs. - **Error Handling**: Logs errors for missing files, attribute issues, or unexpected conditions. - **Extensibility**: Supports plug-and-play integration for new algorithms or datasets. + + +# More files + +There is a repository that archived all the data used to make the model, +as well as previously trained models for you to test out +(loading scripts and vectorizers are not included). + +The repository is located [here](https://github.com/DefinetlyNotAI/VulnScan_TrainingData). + +The repository contains the following directories: +- `Training Data`: Contains the data used to train the models. Is organized by the file size and amount, unless its Tests, where they explicitly say text. +- `Archived Models`: Contains the previously trained models. Is organized by the model type then version. +- `NN features`: Contains information about the model `.3n3` and the vectorizer used. Information include: + - `Documentation_Study_Network.md`: A markdown file that contains more info. + - `Neural Network Nodes Graph.gexf`: A Gephi file that contains the model nodes and edges. + - `Nodes and edges (GEPHI).csv`: A CSV file that contains the model nodes and edges. + - `Statistics`: Directories made by Gephi, containing the statistics of the model nodes and edges. + - `Feature_Importance.svg`: A SVG file that contains the feature importance of the model. + - `Loss_Landscape_3D.html`: A HTML file that contains the 3D loss landscape of the model. + - `Model Accuracy Over Epochs.png` and `Model Loss Over Epochs.png`: PNG files that contain the model accuracy and loss over epochs. + - `Model state dictionary.txt`: A text file that contains the model state dictionary. + - `Model Summary.txt`: A text file that contains the model summary. + - `Model Visualization.png`: A PNG file that contains the model visualization. + - `Top_90_Features.svg`: A SVG file that contains the top 90 features of the model. + - `Vectorizer features.txt`: A text file that contains the vectorizer features. + - `Visualize Activation.png`: A PNG file that contains the visualization of the model activation. + - `Visualize t-SNE.png`: A PNG file that contains the visualization of the model t-SNE. + - `Weight Distribution.png`: A PNG file that contains the weight distribution of the model. diff --git a/CODE/VulnScan/tools/_study_network.py b/CODE/VulnScan/tools/_study_network.py new file mode 100644 index 0000000..907c857 --- /dev/null +++ b/CODE/VulnScan/tools/_study_network.py @@ -0,0 +1,624 @@ +from __future__ import annotations + +import os +import os.path +import random +from collections import OrderedDict +from configparser import ConfigParser +from os import mkdir +from typing import Any + +import joblib +import matplotlib.pyplot as plt +import networkx as nx +import numpy as np +import plotly.graph_objects as go +import seaborn as sns +import torch +import torch.nn as nn +from faker import Faker +from numpy import ndarray, dtype +from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer +from sklearn.manifold import TSNE +from torch import device +from torch.utils.data import DataLoader, TensorDataset +from torchviz import make_dot +from tqdm import tqdm + + +# Example of DataLoader for loss landscape (dummy dataset for visualization) +class DummyDataset(torch.utils.data.Dataset): + """ + A dummy dataset for generating synthetic data for visualization purposes. + + Attributes: + num_samples (int): Number of samples in the dataset. + input_dim (int): Dimension of the input data. + data (list): List of generated data samples. + labels (list): List of labels corresponding to the data samples. + """ + + def __init__(self, num_samples: int = 100, input_dim: int = 10000): + """ + Initializes the DummyDataset with the specified number of samples and input dimension. + + Args: + num_samples (int): Number of samples to generate. + input_dim (int): Dimension of the input data. + """ + self.num_samples = num_samples + self.input_dim = input_dim + self.data: list[str] = [] + self.labels: list[int] = [] + faker = Faker() + for _ in range(num_samples): + if random.random() < 0.05: # 5% chance to include sensitive data + self.data.append(f"Name: {faker.name()}, SSN: {faker.ssn()}, Address: {faker.address()}") + self.labels.append(1) # Label as sensitive + else: + self.data.append(faker.text(max_nb_chars=100)) # Non-sensitive data + self.labels.append(0) # Label as non-sensitive + + def __len__(self) -> int: + """ + Returns the number of samples in the dataset. + + Returns: + int: Number of samples in the dataset. + """ + return self.num_samples + + def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]: + """ + Retrieves the data and label at the specified index. + + Args: + idx (int): Index of the data and label to retrieve. + + Returns: + tuple: A tuple containing the data tensor and label tensor. + """ + data = self.data[idx] + label = self.labels[idx] + # Convert data to tensor of ASCII values and pad to input_dim + data_tensor = torch.tensor([ord(c) for c in data], dtype=torch.float32) + if len(data_tensor) < self.input_dim: + padding = torch.zeros(self.input_dim - len(data_tensor)) + data_tensor = torch.cat((data_tensor, padding)) + else: + data_tensor = data_tensor[:self.input_dim] + label_tensor = torch.tensor(label, dtype=torch.long) + return data_tensor, label_tensor + + +def load_data(text_data: list[str], vectorizer_to_load: TfidfVectorizer | CountVectorizer) -> DataLoader: + """ + Vectorizes the text data and creates a DataLoader for it. + + Args: + text_data (list of str): The text data to be vectorized. + vectorizer_to_load: The vectorizer to use for transforming the text data. + + Returns: + DataLoader: A DataLoader containing the vectorized text data and dummy labels. + """ + # Vectorize the text data + X = vectorizer_to_load.transform(text_data) + # Create a dummy label for visualization (replace with real labels if available) + y = np.zeros(len(text_data)) + # Convert to torch tensors + X_tensor = torch.tensor(X.toarray(), dtype=torch.float32) + y_tensor = torch.tensor(y, dtype=torch.long) + dataset = TensorDataset(X_tensor, y_tensor) + return DataLoader(dataset, batch_size=32, shuffle=True) + + +def visualize_weight_distribution(model_to_load: torch.nn.Module): + # Access weights of the first layer + weights = model_to_load[0].weight.detach().cpu().numpy() # Move tensor to CPU before conversion to numpy + plt.hist(weights.flatten(), bins=50) + plt.title("Weight Distribution - First Layer") + plt.xlabel("Weight Value") + plt.ylabel("Frequency") + plt.savefig("NN features/Weight Distribution.png") + plt.close() + + +def visualize_activations(model_to_load: torch.nn.Module, input_tensor: torch.Tensor): + # Check the device of the model + device_va = next(model_to_load.parameters()).device + + # Move the input tensor to the same device as the model + input_tensor = input_tensor.to(device_va) + + activations = [] + + # noinspection PyUnusedLocal + def hook_fn(module, inputx, output): + # Hook function to extract intermediate layer activations + activations.append(output) + + model_to_load[0].register_forward_hook(hook_fn) # Register hook on first layer + + # Perform a forward pass + _ = model_to_load(input_tensor) + activation = activations[0].detach().cpu().numpy() # Move activations to CPU + + # Plot activations as a bar chart + plt.figure(figsize=(10, 6)) + plt.bar(range(len(activation[0])), activation[0]) + plt.title("Activation Values - First Layer") + plt.xlabel("Neuron Index") + plt.ylabel("Activation Value") + plt.savefig("NN features/Visualize Activation.png") + plt.close() + + +def visualize_tsne(model_to_load: torch.nn.Module, dataloader: DataLoader): + # Get the device of the model + device_va = next(model_to_load.parameters()).device + + model_to_load.eval() # Set the model to evaluation mode + + features = [] + labels = [] + + with torch.no_grad(): + for data, target in dataloader: + # Move data and target to the same device as the model + data, target = data.to(device_va), target.to(device_va) + + # Extract features (output of the model) + output = model_to_load(data) + features.append(output.cpu().numpy()) # Move output to CPU for concatenation + labels.append(target.cpu().numpy()) # Move target to CPU for concatenation + + # Stack all batches + features = np.vstack(features) + labels = np.hstack(labels) + + # Determine suitable perplexity + num_samples = features.shape[0] + perplexity = min(30, num_samples - 1) # Ensure perplexity < num_samples + + # Apply t-SNE + tsne = TSNE(n_components=2, random_state=42, perplexity=perplexity) + reduced_features = tsne.fit_transform(features) + + # Plot the t-SNE results + plt.figure(figsize=(10, 8)) + scatter = plt.scatter(reduced_features[:, 0], reduced_features[:, 1], c=labels, cmap='viridis', alpha=0.7) + plt.colorbar(scatter, label="Class") + plt.title("t-SNE Visualization of Features") + plt.xlabel("t-SNE Dimension 1") + plt.ylabel("t-SNE Dimension 2") + plt.savefig("NN features/Visualize t-SNE.png") + plt.close() + + +# Main function to run all visualizations +def plot_many_graphs(): + print("Starting synthetic data generation...") + # Load data + faker = Faker() + + # Generate sensitive examples + sensitive_data = [ + f"Name: {faker.name()}, SSN: {faker.ssn()}, Address: {faker.address()}", + f"Credit Card: {faker.credit_card_number()}, Expiry: {faker.credit_card_expire()}, CVV: {faker.credit_card_security_code()}", + f"Patient: {faker.name()}, Condition: {faker.text(max_nb_chars=20)}", + f"Password: {faker.password()}", + f"Email: {faker.email()}", + f"Phone: {faker.phone_number()}", + f"Medical Record: {faker.md5()}", + f"Username: {faker.user_name()}", + f"IP: {faker.ipv4()}", + ] + + # Generate non-sensitive examples + non_sensitive_data = [ + faker.text(max_nb_chars=50) for _ in range(50000) + ] + + data_text = non_sensitive_data + (sensitive_data * 15) + random.shuffle(data_text) + print("Loaded data for visualization.") + dataloader = load_data(data_text, vectorizer) + + # Visualizations + print("Creating visualizations...") + visualize_weight_distribution(model) + + # For activations, use a sample from the dataloader + print("Creating activation visualizations...") + sample_input = next(iter(dataloader))[0] + visualize_activations(model, sample_input) + + print("Creating t-SNE visualization - May take a long time...") + visualize_tsne(model, dataloader) + + print("Completed.") + + +# Visualize feature importance (dummy example for visualization) and save as SVG +def visualize_feature_importance(TOKENS: list[str], FEATURE_IMPORTANCE: float | ndarray[Any, dtype[np.floating]], + FILENAME: str = "Plot.svg"): + # Limit the number of tokens to visualize + TOKENS = TOKENS[:1000] + FEATURE_IMPORTANCE = FEATURE_IMPORTANCE[:1000] + + plt.figure(figsize=(len(TOKENS) * 0.5, 6)) + sns.barplot(x=TOKENS, y=FEATURE_IMPORTANCE, palette="coolwarm", hue=TOKENS, legend=False) + plt.title("Feature Importance") + plt.xlabel("Tokens") + plt.ylabel("Importance") + plt.xticks(rotation=45) + plt.savefig(FILENAME, format="svg") + plt.close() # Close the plot to release memory + + +# Function to visualize the loss landscape as an interactive 3D object +def plot_loss_landscape_3d(MODEL: torch.nn.Module, DATA_LOADER: DataLoader, CRITERION: torch.nn.Module, + GRID_SIZE: int = 200, EPSILON: float = 0.01, FILENAME: str = "Plot.html"): + MODEL.eval() # Set model to evaluation mode + param = next(MODEL.parameters()) # Use the first parameter for landscape perturbations + param_flat = param.view(-1) + + # Define perturbation directions u and v + u = torch.randn_like(param_flat).view(param.shape).to(param.device) + v = torch.randn_like(param_flat).view(param.shape).to(param.device) + + # Normalize perturbations + u = EPSILON * u / torch.norm(u) + v = EPSILON * v / torch.norm(v) + + # Create grid + x = np.linspace(-1, 1, GRID_SIZE) + y = np.linspace(-1, 1, GRID_SIZE) + loss_values = np.zeros((GRID_SIZE, GRID_SIZE)) + + # Iterate through the grid to compute losses + for i, dx in enumerate(x): + print(f"Computing loss for row {i + 1}/{GRID_SIZE}...") + for j, dy in enumerate(y): + print(f" Computing loss for column {j + 1}/{GRID_SIZE}...") + param.data += dx * u + dy * v # Apply perturbation + loss = 0 + + # Compute loss for all batches in data loader + for batch in DATA_LOADER: + inputs, targets = batch + inputs = inputs.to(param.device) + targets = targets.to(param.device) + outputs = MODEL(inputs) + loss += CRITERION(outputs, targets).item() + + loss_values[i, j] = loss # Store the loss + param.data -= dx * u + dy * v # Revert perturbation + + # Create a meshgrid for plotting + X, Y = np.meshgrid(x, y) + + # Plot the 3D surface using Plotly + fig = go.Figure(data=[go.Surface(z=loss_values, x=X, y=Y, colorscale="Viridis")]) + fig.update_layout( + title="Loss Landscape (Interactive 3D)", + scene=dict( + xaxis_title="Perturbation in u", + yaxis_title="Perturbation in v", + zaxis_title="Loss", + ), + ) + + # Save as an interactive HTML file + fig.write_html(FILENAME) + print(f"3D loss landscape saved as {FILENAME}") + + +def main_plot(): + # Instantiate data loader + print("Creating dummy data loader...") + dummy_data_loader = DataLoader(DummyDataset(), batch_size=32) + + # Define loss criterion + print("Defining loss criterion...") + criterion = torch.nn.CrossEntropyLoss() + + # Visualizations + print("Creating visualizations...") + tokens = vectorizer.get_feature_names_out() + + # Feature importance + # Max number of features to visualize is 3000 due to image constraints + print( + f"Visualizing feature importance - This may take a while for {len(tokens[:NUMBER_OF_FEATURES]) + 1} tokens...") + feature_importance = np.random.rand(len(tokens[:NUMBER_OF_FEATURES])) # Example random importance + visualize_feature_importance(tokens[:NUMBER_OF_FEATURES], feature_importance, + FILENAME="NN features/feature_importance.svg") + + # Loss landscape + print("Visualizing loss landscape - This may take a while...") + plot_loss_landscape_3d(model, dummy_data_loader, criterion, FILENAME="NN features/loss_landscape_3d.html") + + # Set model to evaluation mode, and plot many graphs + print("Setting model to evaluation mode...") + model.eval() # Set the model to evaluation mode + plot_many_graphs() + + +def save_data(model_to_use: torch.nn.Module, input_size: tuple[int, Any] | int, batch_size: int = -1, + device_to_use: str = "cuda"): + def register_hook(module: torch.nn.Module): + + def hook(modules: torch.nn.Module, inputs: (torch.nn.Module, tuple[torch.Tensor]), output: torch.Tensor): + class_name = str(modules.__class__).split(".")[-1].split("'")[0] + module_idx = len(summaries) + + m_key = "%s-%i" % (class_name, module_idx + 1) + summaries[m_key] = OrderedDict() + summaries[m_key]["input_shape"] = list(inputs[0].size()) + summaries[m_key]["input_shape"][0] = batch_size + if isinstance(output, (list, tuple)): + summaries[m_key]["output_shape"] = [ + [-1] + list(o.size())[1:] for o in output + ] + else: + summaries[m_key]["output_shape"] = list(output.size()) + summaries[m_key]["output_shape"][0] = batch_size + + params = 0 + if hasattr(modules, "weight") and hasattr(modules.weight, "size"): + params += torch.prod(torch.LongTensor(list(modules.weight.size()))) + summaries[m_key]["trainable"] = modules.weight.requires_grad + if hasattr(modules, "bias") and hasattr(modules.bias, "size"): + params += torch.prod(torch.LongTensor(list(modules.bias.size()))) + summaries[m_key]["nb_params"] = params + + if ( + not isinstance(module, nn.Sequential) + and not isinstance(module, nn.ModuleList) + and not (module == model_to_use) + ): + hooks.append(module.register_forward_hook(hook)) + + device_to_use = device_to_use.lower() + assert device_to_use in [ + "cuda", + "cpu", + ], "Input device is not valid, please specify 'cuda' or 'cpu'" + + if device_to_use == "cuda" and torch.cuda.is_available(): + dtype_to_use = torch.cuda.FloatTensor + else: + dtype_to_use = torch.FloatTensor + + # multiple inputs to the network + if isinstance(input_size, tuple): + input_size = [input_size] + + # batch_size of 2 for batch norm + x = [torch.rand(2, *in_size).type(dtype_to_use) for in_size in input_size] + + # create properties + summaries = OrderedDict() + hooks = [] + + # register hook + model_to_use.apply(register_hook) + + # make a forward pass + model_to_use(*x) + + # remove these hooks + for h in hooks: + h.remove() + + # Save the summary + mode = "a" if os.path.exists("NN features/Model Summary.txt") else "w" + with open('NN features/Model Summary.txt', mode) as vf_ms: + vf_ms.write("----------------------------------------------------------------\n") + line_new = "{:>20} {:>25} {:>15}".format("Layer (type)", "Output Shape", "Param #") + vf_ms.write(f"{line_new}\n") + vf_ms.write("================================================================\n") + total_params = 0 + total_output = 0 + trainable_params = 0 + for layer in summaries: + # input_shape, output_shape, trainable, nb_params + line_new = "{:>20} {:>25} {:>15}".format( + layer, + str(summaries[layer]["output_shape"]), + "{0:,}".format(summaries[layer]["nb_params"]), + ) + total_params += summaries[layer]["nb_params"] + total_output += np.prod(summaries[layer]["output_shape"]) + if "trainable" in summaries[layer]: + if summaries[layer]["trainable"]: + trainable_params += summaries[layer]["nb_params"] + vf_ms.write(f"{line_new}\n") + + # assume 4 bytes/number (float on cuda). + total_input_size = abs(np.prod(input_size) * batch_size * 4. / (1024 ** 2.)) + total_output_size = abs(2. * total_output * 4. / (1024 ** 2.)) # x2 for gradients + total_params_size = abs(total_params.numpy() * 4. / (1024 ** 2.)) + total_size = total_params_size + total_output_size + total_input_size + + vf_ms.write("\n================================================================") + vf_ms.write("\nTotal params: {0:,}".format(total_params)) + vf_ms.write("\nTrainable params: {0:,}".format(trainable_params)) + vf_ms.write("\nNon-trainable params: {0:,}".format(total_params - trainable_params)) + vf_ms.write("\n----------------------------------------------------------------") + vf_ms.write("\nInput size (MB): %0.2f" % total_input_size) + vf_ms.write("\nForward/backward pass size (MB): %0.2f" % total_output_size) + vf_ms.write("\nParams size (MB): %0.2f" % total_params_size) + vf_ms.write("\nEstimated Total Size (MB): %0.2f" % total_size) + vf_ms.write("\n----------------------------------------------------------------\n") + + +def save_graph(): + # Create a directed graph + G = nx.DiGraph() + + def add_edges_bulk(layer_names: str, weight_matrices: np.ndarray[np.float32]): + """Efficiently add edges to the graph with progress tracking.""" + threshold = 0.1 # Adjust this threshold as needed + significant_weights = np.abs(weight_matrices) > threshold + rows, cols = np.where(significant_weights) + weights = weight_matrices[rows, cols] + + # Use tqdm for progress tracking + edge_count = len(rows) + with tqdm(total=edge_count, desc=f"Processing {layer_names}", unit="edges") as pbar: + for row, col, weight in zip(rows, cols, weights): + in_node = f"{layer_names}_in_{col}" + out_node = f"{layer_names}_out_{row}" + G.add_edge(in_node, out_node, weight=weight) + pbar.update(1) + + # Process parameters + for name, param in model.named_parameters(): + if 'weight' in name: + layer_name = name.split('.')[0] + weight_matrix = param.data.cpu().numpy() + + # Add edges with progress bar + add_edges_bulk(layer_name, weight_matrix) + + # Draw the graph + print("Writing the graph to a file...") + nx.write_gexf(G, "NN features/Neural Network Nodes Graph.gexf") + + +def setup_environment(): + print("Visualizing the model and vectorizer features...") + print("This may take a while, please wait.") + + if not os.path.exists('NN features'): + mkdir('NN features') + + +def load_vectorizer(): + vectorizer_load = joblib.load(vectorizer_path) + feature_names = vectorizer_load.get_feature_names_out() + with open('NN features/Vectorizer features.txt', 'w') as file: + file.write(f"Number of features: {len(feature_names)}\n\n") + file.write('\n'.join(feature_names)) + return vectorizer_load + + +def visualize_top_features(top_n: int = 90): + feature_names = vectorizer.get_feature_names_out() + sorted_indices = vectorizer.idf_.argsort()[:top_n] + top_features = [feature_names[i] for i in sorted_indices] + top_idf_scores = vectorizer.idf_[sorted_indices] + + plt.figure(figsize=(20, 12)) # Increase the figure size + sns.barplot(x=top_idf_scores, y=top_features) + plt.title('Top 90 Features by IDF Score') + plt.xlabel('IDF Score') + plt.ylabel('Feature') + + # Save the plot as a vector graphic + plt.savefig('NN features/Top_90_Features.svg', format='svg') + plt.close() + + +def load_model() -> tuple[Any, device]: + device_load = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model_load = torch.load(model_path, weights_only=False) + model_load.to(device_load) + return model_load, device_load + + +def save_model_state_dict(): + with open('NN features/Model state dictionary.txt', 'w') as file: + file.write("Model's state dictionary:\n\n") + for param_tensor in model.state_dict(): + file.write(f"\n{param_tensor}\t{model.state_dict()[param_tensor].size()}") + + +def generate_model_visualization(): + dummy_input = torch.randn(1, vectorizer.vocabulary_.__len__()).to(device) + model_viz = make_dot(model(dummy_input), params=dict(model.named_parameters()), show_attrs=True, show_saved=True) + model_viz.format = 'png' + model_viz.render(filename='NN features/Model Visualization', format='png') + + +def cleanup_temp_files(): + if os.path.exists("NN features/Model Visualization"): + os.remove("NN features/Model Visualization") + + +def model_summary(): + mode = "a" if os.path.exists("NN features/Model Summary.txt") else "w" + with open("NN features/Model Summary.txt", mode) as file: + file.write(str(model)) + + +if __name__ == '__main__': + # Print the welcome message + print("===========================================================================================") + print("= This script will visualize the features of the model and vectorizer. =") + print("= Please ensure that the model and vectorizer files are present in the specified paths. =") + print("= The visualization will be saved in the 'NN features' directory. =") + print("= This script will take a while to run, please be patient. =") + print("===========================================================================================") + + # Read the config file + print("\n\nReading config file and setting up...") + config = ConfigParser() + config.read('../../config.ini') + + setup_environment() + + # Load the paths from the config file + vectorizer_path = config.get('VulnScan.study Settings', 'vectorizer_path') + model_path = config.get('VulnScan.study Settings', 'model_path') + NUMBER_OF_FEATURES = int(config.get('VulnScan.study Settings', 'number_of_features')) + + # Check if the paths exist + if not os.path.exists(vectorizer_path): + print(f"Vectorizer file not found. Please double check the path {vectorizer_path}.") + exit(1) + if not os.path.exists(model_path): + print(f"Model file not found. Please double check the path {model_path}.") + exit(1) + + # Load the vectorizer and model + vectorizer = load_vectorizer() + visualize_top_features() + model, device = load_model() + # Save the model summary, state dictionary, and visualization + save_data(model, input_size=(1, vectorizer.vocabulary_.__len__())) + save_model_state_dict() + generate_model_visualization() + cleanup_temp_files() + save_graph() + print("Model visualization and summary have been saved to the 'NN features' directory.") + + # Check if GPU is available + if not os.path.exists('NN features'): + os.mkdir('NN features') + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + print(f"Using device: {device}") + + # Load vectorizer (change the path to your vectorizer .pkl file) + vectorizer_path = "../Vectorizer .3n3.pkl" + model_path = "../Model SenseMini .3n3.pth" + + # Load vectorizer + print(f"Reloading vectorizer from: {vectorizer_path}") + with open(vectorizer_path, "rb") as f: + vectorizer = joblib.load(f) + + # Load model and move to the appropriate device (GPU/CPU) + print(f"Reloading model from: {model_path}") + model = torch.load(model_path, weights_only=False) + model.to(device) # Move model to GPU or CPU + + model_summary() + main_plot() +else: + raise ImportError("This training script is meant to be run directly " + "and cannot be imported. Please execute it as a standalone script.") diff --git a/CODE/VulnScan/tools/_test_gpu_acceleration.py b/CODE/VulnScan/tools/_test_gpu_acceleration.py index 0b82f52..86397e7 100644 --- a/CODE/VulnScan/tools/_test_gpu_acceleration.py +++ b/CODE/VulnScan/tools/_test_gpu_acceleration.py @@ -21,4 +21,8 @@ def check_gpu(): print(f"Error initializing CUDA: {err}") -check_gpu() +if __name__ == '__main__': + check_gpu() +else: + raise ImportError("This training script is meant to be run directly " + "and cannot be imported. Please execute it as a standalone script.") diff --git a/CODE/VulnScan/tools/_vectorizer.py b/CODE/VulnScan/tools/_vectorizer.py index 1ad7da8..25e5727 100644 --- a/CODE/VulnScan/tools/_vectorizer.py +++ b/CODE/VulnScan/tools/_vectorizer.py @@ -9,6 +9,15 @@ def load_data(data_paths: str | os.PathLike) -> list[str]: + """ + Load data from the specified path(s). + + Args: + data_paths (str | os.PathLike): Path to a directory or a file containing data. + + Returns: + list[str]: List of strings, each representing the content of a file. + """ data = [] if os.path.isdir(data_paths): for root, _, files in os.walk(data_paths): @@ -24,6 +33,18 @@ def load_data(data_paths: str | os.PathLike) -> list[str]: def choose_vectorizer(vectorizer_types: str) -> TfidfVectorizer | CountVectorizer: + """ + Choose and return a vectorizer based on the specified type. + + Args: + vectorizer_types (str): Type of vectorizer to use ('tfidf' or 'count'). + + Returns: + TfidfVectorizer | CountVectorizer: The chosen vectorizer. + + Raises: + ValueError: If an unsupported vectorizer type is specified. + """ print("Vectorizer Type: ", vectorizer_types) print("Vectorizing Data...") if vectorizer_types == 'tfidf': @@ -34,6 +55,14 @@ def choose_vectorizer(vectorizer_types: str) -> TfidfVectorizer | CountVectorize def main(data_paths: str, vectorizer_types: str, output_paths: str): + """ + Main function to load data, choose a vectorizer, fit the vectorizer to the data, and save the vectorizer. + + Args: + data_paths (str): Path to the data. + vectorizer_types (str): Type of vectorizer to use ('tfidf' or 'count'). + output_paths (str): Path to save the fitted vectorizer. + """ data = load_data(data_paths) vectorizer = choose_vectorizer(vectorizer_types) vectorizer.fit(data) @@ -51,3 +80,6 @@ def main(data_paths: str, vectorizer_types: str, output_paths: str): if not os.path.exists(output_path): os.makedirs(output_path) main(data_path, vectorizer_type, output_path) +else: + raise ImportError("This training script is meant to be run directly " + "and cannot be imported. Please execute it as a standalone script.") diff --git a/CODE/VulnScan/v2-deprecated/_generate_data.py b/CODE/VulnScan/v2-deprecated/_generate_data.py index 5992524..778c1c2 100644 --- a/CODE/VulnScan/v2-deprecated/_generate_data.py +++ b/CODE/VulnScan/v2-deprecated/_generate_data.py @@ -9,9 +9,15 @@ fake = Faker() -# Function to generate a sensitive file with real sensitive information -@deprecated(reason="This function is only used for generating sensitive data for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.3.0") +@deprecated(reason="This function is only used for generating sensitive data for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.4.0") def create_sensitive_file(file_path: str, max_size: int): + """ + Generate a sensitive file with real sensitive information. + + Args: + file_path (str): The path where the file will be saved. + max_size (int): The maximum size of the file in bytes. + """ content = "" # Generate sensitive data using Faker content += f"Name: {fake.name()}\n" @@ -30,9 +36,15 @@ def create_sensitive_file(file_path: str, max_size: int): f.write(content) -# Function to generate a normal file with non-sensitive data -@deprecated(reason="This function is only used for generating normal data for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.3.0") +@deprecated(reason="This function is only used for generating normal data for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.4.0") def create_normal_file(file_path: str, max_size: int): + """ + Generate a normal file with non-sensitive data. + + Args: + file_path (str): The path where the file will be saved. + max_size (int): The maximum size of the file in bytes. + """ content = "" # Add random text while len(content.encode('utf-8')) < max_size: @@ -42,9 +54,15 @@ def create_normal_file(file_path: str, max_size: int): f.write(content) -# Function to generate a mix file with both normal and sensitive data -@deprecated(reason="This function is only used for generating mixed data for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.3.0") +@deprecated(reason="This function is only used for generating mixed data for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.4.0") def create_mix_file(file_path: str, max_size: int): + """ + Generate a mix file with both normal and sensitive data. + + Args: + file_path (str): The path where the file will be saved. + max_size (int): The maximum size of the file in bytes. + """ content = "" # Add a mix of normal and sensitive data while len(content.encode('utf-8')) < max_size: @@ -59,9 +77,15 @@ def create_mix_file(file_path: str, max_size: int): f.write(content) -# Function to create random files (Normal, Mix, Sensitive) -@deprecated(reason="This function is only used for generating random files for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.3.0") +@deprecated(reason="This function is only used for generating random files for testing purposes for v2 trainers, v2 trainers are deprecated now, use v3 trainers.", removal_version="3.4.0") def create_random_files(directories: str, num_file: int = 100): + """ + Create random files (Normal, Mix, Sensitive). + + Args: + directories (str): The directory where the files will be saved. + num_file (int): The number of files to generate. + """ os.makedirs(directories, exist_ok=True) for i in range(num_file): @@ -79,4 +103,8 @@ def create_random_files(directories: str, num_file: int = 100): print(f"Created {file_type} file: {file_name}") -create_random_files(SAVE_DIRECTORY, num_file=1000000) +if __name__ == "__main__": + create_random_files(SAVE_DIRECTORY, num_file=1000000) +else: + raise ImportError("This training script is meant to be run directly " + "and cannot be imported. Please execute it as a standalone script.") diff --git a/CODE/VulnScan/v2-deprecated/_train.py b/CODE/VulnScan/v2-deprecated/_train.py index 4cfa624..5daa78f 100644 --- a/CODE/VulnScan/v2-deprecated/_train.py +++ b/CODE/VulnScan/v2-deprecated/_train.py @@ -2,7 +2,6 @@ import logging import os -from os import mkdir import joblib import matplotlib.pyplot as plt @@ -20,6 +19,7 @@ from sklearn.svm import SVC from torch.utils.data import DataLoader, TensorDataset from transformers import BertTokenizer, BertForSequenceClassification + from logicytics import deprecated # Configure logging @@ -40,9 +40,17 @@ # --------------------------------------- -@deprecated(reason="This function is used to load data from a directory. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.3.0") +@deprecated(reason="This function is used to load data from a directory. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.2.0") def load_data(data_dir: str) -> tuple[list[str], np.ndarray]: - """Loads text data and labels from the directory.""" + """ + Loads text data and labels from the directory. + + Args: + data_dir (str): The directory containing the data files. + + Returns: + tuple[list[str], np.ndarray]: A tuple containing the list of texts and the corresponding labels. + """ texts, labels = [], [] for file_name in os.listdir(data_dir): with open(os.path.join(data_dir, file_name), "r", encoding="utf-8") as f: @@ -53,9 +61,18 @@ def load_data(data_dir: str) -> tuple[list[str], np.ndarray]: return texts, np.array(labels) -@deprecated(reason="This function is used to evaluate a model. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.3.0") +@deprecated(reason="This function is used to evaluate a model. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.2.0") def evaluate_model(y_true: np.ndarray, y_pred: np.ndarray) -> tuple[float, float, float, float, float]: - """Evaluates the model using standard metrics.""" + """ + Evaluates the model using standard metrics. + + Args: + y_true (np.ndarray): The true labels. + y_pred (np.ndarray): The predicted labels. + + Returns: + tuple[float, float, float, float, float]: A tuple containing accuracy, precision, recall, F1 score, and ROC-AUC score. + """ accuracy = accuracy_score(y_true, y_pred) precision = precision_score(y_true, y_pred, zero_division=1) recall = recall_score(y_true, y_pred, zero_division=1) @@ -71,8 +88,15 @@ def evaluate_model(y_true: np.ndarray, y_pred: np.ndarray) -> tuple[float, float # --------------------------------------- -@deprecated(reason="This function is used to save progress graphs. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.3.0") +@deprecated(reason="This function is used to save progress graphs. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.2.0") def save_progress_graph(accuracies: list[float], filename: str = "training_progress.png"): + """ + Saves a graph of training progress. + + Args: + accuracies (list[float]): List of accuracies for each epoch. + filename (str): The filename to save the graph as. + """ plt.figure(figsize=(8, 6)) plt.plot(range(1, len(accuracies) + 1), accuracies, marker='o', label="Training Accuracy") plt.xlabel("Epochs") @@ -84,10 +108,19 @@ def save_progress_graph(accuracies: list[float], filename: str = "training_progr plt.close() -@deprecated(reason="This function is used to train xgboost. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.3.0") +@deprecated(reason="This function is used to train xgboost. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.2.0") def train_xgboost(X_train: np.ndarray, X_test: np.ndarray, y_train: np.ndarray, y_test: np.ndarray, SAVE_DIR: str): - """Trains a Gradient Boosting Classifier (XGBoost) with GPU.""" + """ + Trains a Gradient Boosting Classifier (XGBoost) with GPU. + + Args: + X_train (np.ndarray): Training data features. + X_test (np.ndarray): Testing data features. + y_train (np.ndarray): Training data labels. + y_test (np.ndarray): Testing data labels. + SAVE_DIR (str): Directory to save the trained model. + """ logging.info("Enabling GPU acceleration...") model = xgb.XGBClassifier(tree_method='hist', device=DEVICE) # Enable GPU acceleration logging.info("GPU acceleration enabled.") @@ -100,11 +133,25 @@ def train_xgboost(X_train: np.ndarray, X_test: np.ndarray, logging.info("Model saved as xgboost_model.pkl") -@deprecated(reason="This function is used to train bert. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.3.0") +@deprecated(reason="This function is used to train bert. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.2.0") def train_bert(X_train: np.ndarray, X_test: np.ndarray, y_train: np.ndarray, y_test: np.ndarray, MAX_LEN: int, LEARNING_RATE: float, BATCH_SIZE: int, EPOCHS: int, SAVE_DIR: str, MODEL_PATH: str): - """Trains a BERT model with GPU support.""" + """ + Trains a BERT model with GPU support. + + Args: + X_train (np.ndarray): Training data features. + X_test (np.ndarray): Testing data features. + y_train (np.ndarray): Training data labels. + y_test (np.ndarray): Testing data labels. + MAX_LEN (int): Maximum length of the sequences. + LEARNING_RATE (float): Learning rate for the optimizer. + BATCH_SIZE (int): Batch size for training. + EPOCHS (int): Number of epochs for training. + SAVE_DIR (str): Directory to save the trained model. + MODEL_PATH (str): Path to the pre-trained BERT model. + """ logging.info("Loading BERT tokenizer...") tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME) train_encodings = tokenizer(list(X_train), truncation=True, padding=True, max_length=MAX_LEN, return_tensors="pt") @@ -154,14 +201,34 @@ def train_bert(X_train: np.ndarray, X_test: np.ndarray, y_train: np.ndarray, class LSTMModel(nn.Module): + @deprecated(reason="This class is used to define an LSTM model. Its for training v2 models, which is now deprecated, use _train.py v3 instead.", removal_version="3.2.0") def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_dim: int = 128, output_dim: int = 1): + """ + Initializes the LSTM model. + + Args: + vocab_size (int): Size of the vocabulary. + embedding_dim (int): Dimension of the embedding layer. + hidden_dim (int): Dimension of the hidden layer. + output_dim (int): Dimension of the output layer. + """ super(LSTMModel, self).__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True, batch_first=True) self.fc = nn.Linear(hidden_dim * 2, output_dim) # Bidirectional, so multiply by 2 self.sigmoid = nn.Sigmoid() + @deprecated(reason="This class is used to define an LSTM model. Its for training v2 models, which is now deprecated, use _train.py v3 instead.", removal_version="3.2.0") def forward(self, x: torch.Tensor) -> torch.Tensor: + """ + Defines the forward pass of the LSTM model. + + Args: + x (torch.Tensor): Input tensor. + + Returns: + torch.Tensor: Output tensor. + """ x = self.embedding(x) lstm_out, _ = self.lstm(x) x = self.fc(lstm_out[:, -1, :]) @@ -169,11 +236,24 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: return x -@deprecated(reason="This function is used to train lstm. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.3.0") +@deprecated(reason="This function is used to train lstm. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.2.0") def train_lstm(X_train: np.ndarray, X_test: np.ndarray, y_train: np.ndarray, y_test: np.ndarray, MAX_FEATURES: int, LEARNING_RATE: float, BATCH_SIZE: int, EPOCHS: int, SAVE_DIR: str): - """Trains an LSTM model using PyTorch with GPU support.""" + """ + Trains an LSTM model using PyTorch with GPU support. + + Args: + X_train (np.ndarray): Training data features. + X_test (np.ndarray): Testing data features. + y_train (np.ndarray): Training data labels. + y_test (np.ndarray): Testing data labels. + MAX_FEATURES (int): Maximum number of features for the vectorizer. + LEARNING_RATE (float): Learning rate for the optimizer. + BATCH_SIZE (int): Batch size for training. + EPOCHS (int): Number of epochs for training. + SAVE_DIR (str): Directory to save the trained model. + """ logging.info("Training LSTM...") logging.info("Vectorizing text data...") vectorizer = TfidfVectorizer(max_features=MAX_FEATURES) @@ -232,10 +312,22 @@ def train_lstm(X_train: np.ndarray, X_test: np.ndarray, y_train: np.ndarray, # --------------------------------------- # noinspection DuplicatedCode -@deprecated(reason="This function is used to train NeuralNetworks/SVM. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.3.0") +@deprecated(reason="This function is used to train NeuralNetworks/SVM. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.2.0") def train_nn_svm(MODEL: str, EPOCHS: int, SAVE_DIR: str, MAX_FEATURES: int, TEST_SIZE: float | int, MAX_ITER: int, RANDOM_STATE: int): + """ + Trains a Neural Network or SVM model with hyperparameter tuning. + + Args: + MODEL (str): The type of model to train ('svm' or 'nn'). + EPOCHS (int): Number of epochs for training. + SAVE_DIR (str): Directory to save the trained model. + MAX_FEATURES (int): Maximum number of features for the vectorizer. + TEST_SIZE (float | int): Proportion of the dataset to include in the test split. + MAX_ITER (int): Maximum number of iterations for the model. + RANDOM_STATE (int): Random state for reproducibility. + """ if MODEL not in ["svm", "nn"]: logging.error(f"Invalid model type: {MODEL}. Please choose 'svm' or 'nn'.") return @@ -321,10 +413,25 @@ def train_nn_svm(MODEL: str, EPOCHS: int, SAVE_DIR: str, logging.info("Training complete.") -@deprecated(reason="This function is used setup training. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.3.0") +@deprecated(reason="This function is used setup training. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.2.0") def train_model_blx(MODEL_TYPE: str, SAVE_DIR: str, EPOCHS: int, BATCH_SIZE: int, LEARNING_RATE: float, MAX_FEATURES: int, MAX_LEN: int, TEST_SIZE: float | int, RANDOM_STATE: int, MODEL_PATH_BERT: str = None): + """ + Sets up and trains a model based on the specified type. + + Args: + MODEL_TYPE (str): The type of model to train ('xgboost', 'bert', 'lstm'). + SAVE_DIR (str): Directory to save the trained model. + EPOCHS (int): Number of epochs for training. + BATCH_SIZE (int): Batch size for training. + LEARNING_RATE (float): Learning rate for the optimizer. + MAX_FEATURES (int): Maximum number of features for the vectorizer. + MAX_LEN (int): Maximum length of the sequences (for BERT). + TEST_SIZE (float | int): Proportion of the dataset to include in the test split. + RANDOM_STATE (int): Random state for reproducibility. + MODEL_PATH_BERT (str, optional): Path to the pre-trained BERT model. + """ # Create save directory if it doesn't exist os.makedirs(SAVE_DIR, exist_ok=True) @@ -355,9 +462,19 @@ def train_model_blx(MODEL_TYPE: str, SAVE_DIR: str, EPOCHS: int, BATCH_SIZE: int # noinspection DuplicatedCode -@deprecated(reason="This function is used to train RandomForest. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.3.0") +@deprecated(reason="This function is used to train RandomForest. Its for training v2 models, which is now deprecated, use train.py v3 instead.", removal_version="3.2.0") def train_rfc(SAVE_DIR: str, EPOCHS: int, TEST_SIZE: float | int, N_ESTIMATORS: int, RANDOM_STATE: int): + """ + Trains a Random Forest Classifier. + + Args: + SAVE_DIR (str): Directory to save the trained model. + EPOCHS (int): Number of epochs for training. + TEST_SIZE (float | int): Proportion of the dataset to include in the test split. + N_ESTIMATORS (int): Number of trees in the forest. + RANDOM_STATE (int): Random state for reproducibility. + """ logging.info("Training model...") # Load data @@ -391,7 +508,7 @@ def train_rfc(SAVE_DIR: str, EPOCHS: int, TEST_SIZE: float | int, # Save progress plot if not os.path.exists(SAVE_DIR): - mkdir(SAVE_DIR) + os.mkdir(SAVE_DIR) save_progress_graph(accuracies, filename=os.path.join(SAVE_DIR, "training_progress.png")) # Save model checkpoint @@ -427,3 +544,6 @@ def train_rfc(SAVE_DIR: str, EPOCHS: int, TEST_SIZE: float | int, train_model_blx(MODEL_TYPE="bert", SAVE_DIR=r"C:\Users\Hp\Desktop\Model Tests\Model Sense .2b1", EPOCHS=5, BATCH_SIZE=8, LEARNING_RATE=5e-5, MAX_FEATURES=5000, MAX_LEN=128, TEST_SIZE=0.2, RANDOM_STATE=42, MODEL_PATH_BERT="../bert-base-uncased-model") +else: + raise ImportError("This training script is meant to be run directly " + "and cannot be imported. Please execute it as a standalone script.") diff --git a/CODE/VulnScan/v3/_generate_data.py b/CODE/VulnScan/v3/_generate_data.py index 0b28b6d..0bc8dd3 100644 --- a/CODE/VulnScan/v3/_generate_data.py +++ b/CODE/VulnScan/v3/_generate_data.py @@ -1,98 +1,96 @@ +from __future__ import annotations + import os import random import string import configparser +from Logicytics import Log, DEBUG from faker import Faker -# Initialize Faker -fake = Faker() - -# Read configuration -config = configparser.ConfigParser() -config.read('../../config.ini') - -# Load configuration values -config = config['VulnScan.generate Settings'] -EXTENSIONS_ALLOWED = config.get('extensions', '.txt').split(',') -SAVE_PATH = config.get('save_path', '.') -CODE_NAME = config.get('code_name', 'Sense') -SIZE_VARIATION = float(config.get('size_variation', '0.1')) - -# Ensure the save directory exists -os.makedirs(SAVE_PATH, exist_ok=True) - -# Set default file size and number of files -DEFAULT_FILE_NUM = 10000 -DEFAULT_MIN_FILE_SIZE = 10 * 1024 # 10 KB -DEFAULT_MAX_FILE_SIZE = 10 * 1024 # 10 KB - -# File configuration based on CODE_NAME -if CODE_NAME == 'Sense': - FILE_NUM = DEFAULT_FILE_NUM * 5 - MIN_FILE_SIZE = DEFAULT_MIN_FILE_SIZE * 5 - MAX_FILE_SIZE = DEFAULT_MAX_FILE_SIZE * 5 -elif CODE_NAME == 'SenseNano': - FILE_NUM = 5 - MIN_FILE_SIZE = int(DEFAULT_MIN_FILE_SIZE * 0.5) - MAX_FILE_SIZE = int(DEFAULT_MAX_FILE_SIZE * 0.5) -elif CODE_NAME == 'SenseMacro': - FILE_NUM = DEFAULT_FILE_NUM * 100 - MIN_FILE_SIZE = DEFAULT_MIN_FILE_SIZE - MAX_FILE_SIZE = DEFAULT_MAX_FILE_SIZE -elif CODE_NAME == 'SenseMini': - FILE_NUM = DEFAULT_FILE_NUM - MIN_FILE_SIZE = DEFAULT_MIN_FILE_SIZE - MAX_FILE_SIZE = DEFAULT_MAX_FILE_SIZE -else: # Custom configuration - MIN_FILE_SIZE = int(config['min_file_size'].replace('KB', '')) * 1024 - MAX_FILE_SIZE = int(config['max_file_size'].replace('KB', '')) * 1024 - FILE_NUM = DEFAULT_FILE_NUM - -print(f"Generating {FILE_NUM} files with sizes between {MIN_FILE_SIZE} and {MAX_FILE_SIZE} bytes") - - -# Function to generate random file names -def generate_random_filename(extensions, suffix_x): + +logger = Log( + {"log_level": DEBUG, + "filename": "../../../ACCESS/LOGS/VulnScan_Train.log", + "colorlog_fmt_parameters": + "%(log_color)s%(levelname)-8s%(reset)s %(yellow)s%(asctime)s %(blue)s%(message)s", + } +) + + +def generate_random_filename(extensions: str, suffix_x: str = '') -> str: + """ + Generate a random filename with the given extension and optional suffix. + + Args: + extensions (str): The file extension. + suffix_x (str, optional): An optional suffix to add to the filename. + + Returns: + str: The generated random filename. + """ return ''.join(random.choices(string.ascii_letters + string.digits, k=10)) + suffix_x + extensions -# Function to generate content based on file extension -def generate_content_for_extension(extensions, size): - # Define sensitive data generators - sensitive_data_generators = { - '.txt': lambda: random.choice([ - fake.credit_card_number(), - fake.ssn(), - fake.password(), - fake.email(), - fake.phone_number(), - fake.iban(), - ]), - '.json': lambda: { - 'credit_card': fake.credit_card_number(), - 'email': fake.email(), - 'phone': fake.phone_number(), - 'password': fake.password(), - 'iban': fake.iban(), - }, - '.csv': lambda: ",".join([ - fake.credit_card_number(), - fake.email(), - fake.phone_number(), - ]), - '.xml': lambda: f"{random.choice([fake.credit_card_number(), fake.iban(), fake.password()])}", - '.log': lambda: f"{fake.date_time()} - Sensitive Data: {random.choice([fake.email(), fake.password(), fake.ipv4_private()])}", - 'default': lambda: fake.text(max_nb_chars=50) - } - - # Define sensitivity chances +def generate_content_for_extension(extensions: str, size: int | float) -> tuple[str, str]: + """ + Generate content based on the file extension and size. + + Args: + extensions (str): The file extension. + size (int | float): The size of the content to generate. + + Returns: + tuple[str, str]: The generated content and a suffix indicating the sensitivity level. + """ full_sensitive_chance = float(config.get('full_sensitive_chance', '0.1')) partial_sensitive_chance = float(config.get('partial_sensitive_chance', '0.3')) - def generate_sensitive_data(): + def generate_sensitive_data() -> str: + """ + Generate sensitive data based on the file extension. + + Returns: + str: The generated sensitive data. + """ + sensitive_data_generators = { + '.txt': lambda: random.choice([ + fake.credit_card_number(), + fake.ssn(), + fake.password(), + fake.email(), + fake.phone_number(), + fake.iban(), + ]), + '.json': lambda: { + 'credit_card': fake.credit_card_number(), + 'email': fake.email(), + 'phone': fake.phone_number(), + 'password': fake.password(), + 'iban': fake.iban(), + }, + '.csv': lambda: ",".join([ + fake.credit_card_number(), + fake.email(), + fake.phone_number(), + ]), + '.xml': lambda: f"{random.choice([fake.credit_card_number(), fake.iban(), fake.password()])}", + '.log': lambda: f"{fake.date_time()} - Sensitive Data: {random.choice([fake.email(), fake.password(), fake.ipv4_private()])}", + 'default': lambda: fake.text(max_nb_chars=50) + } + return sensitive_data_generators.get(extensions, sensitive_data_generators['default'])() - def generate_regular_content(extension_grc, sizes): + def generate_regular_content(extension_grc: str, sizes: int | float) -> str: + """ + Generate regular content based on the file extension and size. + + Args: + extension_grc (str): The file extension. + sizes (int | float): The size of the content to generate. + + Returns: + str: The generated regular content. + """ if extension_grc == '.txt': content_grc = fake.text(max_nb_chars=sizes) elif extension_grc == '.json': @@ -111,12 +109,10 @@ def generate_regular_content(extension_grc, sizes): elif extension_grc == '.log': content_grc = "\n".join([f"{fake.date_time()} - {fake.text(50)}" for _ in range(sizes // 100)]) else: - # Default to plain text for unknown extensions content_grc = fake.text(max_nb_chars=sizes) return content_grc if random.random() < full_sensitive_chance: - # Generate fully sensitive content if extensions == '.json': contents = str([generate_sensitive_data() for _ in range(size // 500)]) elif extensions in ['.txt', '.log', '.xml']: @@ -127,12 +123,10 @@ def generate_regular_content(extension_grc, sizes): contents = "\n".join([generate_sensitive_data() for _ in range(size // 500)]) return contents, '-sensitive' else: - # Generate regular content with optional partial sensitivity regular_content = generate_regular_content(extensions, size) if random.random() < partial_sensitive_chance: - sensitive_data_count = max(1, size // 500) # Embed some sensitive data + sensitive_data_count = max(1, size // 500) sensitive_data = [generate_sensitive_data() for _ in range(sensitive_data_count)] - # Blend sensitive data into the regular content regular_content_lines = regular_content.split("\n") for _ in range(sensitive_data_count): insert_position = random.randint(0, len(regular_content_lines) - 1) @@ -144,8 +138,16 @@ def generate_regular_content(extension_grc, sizes): return contents, '-none' -# Function to generate file content -def generate_file_content(extensions): +def generate_file_content(extensions: str) -> tuple[str, str]: + """ + Generate file content based on the file extension. + + Args: + extensions (str): The file extension. + + Returns: + tuple[str, str]: The generated content and a suffix indicating the sensitivity level. + """ size = random.randint(MIN_FILE_SIZE, MAX_FILE_SIZE) if SIZE_VARIATION != 0: variation_choice = random.choice([1, 2, 3, 4]) @@ -157,18 +159,66 @@ def generate_file_content(extensions): size = abs(int(size + (size / SIZE_VARIATION))) elif variation_choice == 4: size = abs(int(size - (size / SIZE_VARIATION))) - print(f"Generating {extensions} content of size {size} bytes") + logger.debug(f"Generating {extensions} content of size {size} bytes") return generate_content_for_extension(extensions, size) -# Generate files -for i in range(FILE_NUM): - print(f"Generating file {i + 1}/{FILE_NUM}") - extension = random.choice(EXTENSIONS_ALLOWED).strip() - content, suffix = generate_file_content(extension) - filename = generate_random_filename(extension, suffix) - filepath = os.path.join(SAVE_PATH, filename) - with open(filepath, 'w', encoding='utf-8') as f: - f.write(content) - -print(f"Generated {FILE_NUM} files in {SAVE_PATH}") +if __name__ == "__main__": + """ + Main function to generate files based on the configuration. + """ + fake = Faker() + + config = configparser.ConfigParser() + config.read('../../config.ini') + + config = config['VulnScan.generate Settings'] + EXTENSIONS_ALLOWED = config.get('extensions', '.txt').split(',') + SAVE_PATH = config.get('save_path', '.') + CODE_NAME = config.get('code_name', 'Sense') + SIZE_VARIATION = float(config.get('size_variation', '0.1')) + + os.makedirs(SAVE_PATH, exist_ok=True) + + DEFAULT_FILE_NUM = 10000 + DEFAULT_MIN_FILE_SIZE = 10 * 1024 + DEFAULT_MAX_FILE_SIZE = 10 * 1024 + + if CODE_NAME == 'Sense': + FILE_NUM = DEFAULT_FILE_NUM * 5 + MIN_FILE_SIZE = DEFAULT_MIN_FILE_SIZE * 5 + MAX_FILE_SIZE = DEFAULT_MAX_FILE_SIZE * 5 + elif CODE_NAME == 'SenseNano': + FILE_NUM = 5 + MIN_FILE_SIZE = int(DEFAULT_MIN_FILE_SIZE * 0.5) + MAX_FILE_SIZE = int(DEFAULT_MAX_FILE_SIZE * 0.5) + elif CODE_NAME == 'SenseMacro': + logger.warning("Generating 100 times more files and 100 times larger files") + logger.warning("This is being deprecated in version 3.2.0") + FILE_NUM = DEFAULT_FILE_NUM * 100 + MIN_FILE_SIZE = DEFAULT_MIN_FILE_SIZE + MAX_FILE_SIZE = DEFAULT_MAX_FILE_SIZE + elif CODE_NAME == 'SenseMini': + FILE_NUM = DEFAULT_FILE_NUM + MIN_FILE_SIZE = DEFAULT_MIN_FILE_SIZE + MAX_FILE_SIZE = DEFAULT_MAX_FILE_SIZE + else: + MIN_FILE_SIZE = int(config['min_file_size'].replace('KB', '')) * 1024 + MAX_FILE_SIZE = int(config['max_file_size'].replace('KB', '')) * 1024 + FILE_NUM = DEFAULT_FILE_NUM + + logger.info(f"Generating {FILE_NUM} files with sizes between {MIN_FILE_SIZE} and {MAX_FILE_SIZE} bytes") + + for i in range(FILE_NUM): + logger.debug(f"Generating file {i + 1}/{FILE_NUM}") + extension = random.choice(EXTENSIONS_ALLOWED).strip() + content, suffix = generate_file_content(extension) + filename = generate_random_filename(extension, suffix) + filepath = os.path.join(SAVE_PATH, filename) + with open(filepath, 'w', encoding='utf-8') as f: + f.write(content) + + logger.info(f"Generated {FILE_NUM} files in {SAVE_PATH}") +else: + raise ImportError("This training script is meant to be run directly " + "and cannot be imported. Please execute it as a standalone script.") diff --git a/CODE/VulnScan/v3/_train.py b/CODE/VulnScan/v3/_train.py index c9fa7ee..f9bfb2a 100644 --- a/CODE/VulnScan/v3/_train.py +++ b/CODE/VulnScan/v3/_train.py @@ -178,6 +178,7 @@ def select_model_from_traditional(model_name: str, logger.error(f"Invalid model name: {model_name}") exit(1) + def train_traditional_model(model_name: str, epochs: int, save_model_path: str): @@ -343,49 +344,99 @@ def train_model( train_traditional_model(model_name, epochs, save_model_path) -# Config file reading and setting constants -logger.info("Reading config file") -config = ConfigParser() -config.read('../../config.ini') -MODEL_NAME = config.get('VulnScan.train Settings', 'model_name') -TRAINING_PATH = config.get('VulnScan.train Settings', 'train_data_path') -EPOCHS = int(config.get('VulnScan.train Settings', 'epochs')) -BATCH_SIZE = int(config.get('VulnScan.train Settings', 'batch_size')) -LEARN_RATE = float(config.get('VulnScan.train Settings', 'learning_rate')) -CUDA = config.getboolean('VulnScan.train Settings', 'use_cuda') -SAVE_PATH = config.get('VulnScan.train Settings', 'save_model_path') - -# Load Data -logger.info(f"Loading data from {TRAINING_PATH}") -texts, labels = [], [] -for filename in os.listdir(TRAINING_PATH): - with open(os.path.join(config.get('VulnScan.train Settings', 'train_data_path'), filename), 'r', - encoding='utf-8') as file: - texts.append(file.read()) - labels.append(1 if '-sensitive' in filename else 0) - logger.debug(f"Loaded data from {filename} with label {labels[-1]}") - -# Split Data -logger.info("Splitting data into training and validation sets") -X_train, X_val, y_train, y_val = train_test_split(texts, - labels, - test_size=0.2, - random_state=42) - -# Train Model -try: - train_model(model_name=MODEL_NAME, - epochs=EPOCHS, - batch_size=BATCH_SIZE, - learning_rate=LEARN_RATE, - save_model_path=SAVE_PATH, - use_cuda=CUDA) -except FileNotFoundError as e: - logger.error(f"File Not Found Error in training model: {e}") - exit(1) -except AttributeError as e: - logger.error(f"Attribute Error in training model: {e}") - exit(1) -except Exception as e: - logger.error(f"Error in training model: {e}") - exit(1) +def validate_data(): + """ + Validates the data by checking if the variables are of the correct type. + """ + if not isinstance(EPOCHS, int) or EPOCHS <= 0: + logger.error("EPOCHS must be a positive integer") + exit(1) + if not isinstance(BATCH_SIZE, int) or BATCH_SIZE <= 0: + logger.error("BATCH_SIZE must be a positive integer") + exit(1) + if not isinstance(LEARN_RATE, float) or not (0 < LEARN_RATE < 1): + logger.error("LEARN_RATE must be a float between 0 and 1") + exit(1) + if not isinstance(CUDA, bool): + logger.error("CUDA must be a boolean") + exit(1) + + allowed_models = ["NeuralNetwork", "LogReg", "RandomForest", "ExtraTrees", "GBM", "XGBoost", "DecisionTree", "NaiveBayes"] + if MODEL_NAME not in allowed_models: + logger.error(f"MODEL_NAME must be one of: {', '.join(allowed_models)}") + exit(1) + if not os.path.exists(TRAINING_PATH): + logger.error(f"Training data path {TRAINING_PATH} does not exist") + exit(1) + if not os.path.exists(os.path.dirname(SAVE_PATH)): + logger.error(f"Save model path {SAVE_PATH} does not exist") + exit(1) + + +if __name__ == "__main__": + # Config file reading and setting constants + logger.info("Reading config file") + config = ConfigParser() + config.read('../../config.ini') + + MODEL_NAME = config.get('VulnScan.train Settings', 'model_name') + TRAINING_PATH = config.get('VulnScan.train Settings', 'train_data_path') + EPOCHS = int(config.get('VulnScan.train Settings', 'epochs')) + BATCH_SIZE = int(config.get('VulnScan.train Settings', 'batch_size')) + LEARN_RATE = float(config.get('VulnScan.train Settings', 'learning_rate')) + CUDA = config.getboolean('VulnScan.train Settings', 'use_cuda') + SAVE_PATH = config.get('VulnScan.train Settings', 'save_model_path') + + validate_data() + + # Load Data + logger.info(f"Loading data from {TRAINING_PATH}") + texts, labels = [], [] + for filename in os.listdir(TRAINING_PATH): + with open(os.path.join(config.get('VulnScan.train Settings', 'train_data_path'), filename), 'r', + encoding='utf-8') as file: + texts.append(file.read()) + labels.append(1 if '-sensitive' in filename else 0) + logger.debug(f"Loaded data from {filename} with label {labels[-1]}") + + # Split Data + logger.info("Splitting data into training and validation sets") + X_train, X_val, y_train, y_val = train_test_split(texts, + labels, + test_size=0.2, + random_state=42) + + # Train Model + try: + train_model(model_name=MODEL_NAME, + epochs=EPOCHS, + batch_size=BATCH_SIZE, + learning_rate=LEARN_RATE, + save_model_path=SAVE_PATH, + use_cuda=CUDA) + except RuntimeError as e: + if "CUDA" in str(e): + logger.error(f"GPU error: {e}. Falling back to CPU...") + train_model(model_name=MODEL_NAME, + epochs=EPOCHS, + batch_size=BATCH_SIZE, + learning_rate=LEARN_RATE, + save_model_path=SAVE_PATH, + use_cuda=False) + else: + logger.error(f"Runtime Error in training model: {e}") + exit(1) + except FileNotFoundError as e: + logger.error(f"Training data or model files not found: {e}." + f" Please check if all required files exist.") + exit(1) + except AttributeError as e: + logger.error(f"Invalid model configuration or missing attributes: {e}." + f" Please verify model settings.") + exit(1) + except Exception as e: + logger.error(f"Error in training model: {e}") + exit(1) +else: + raise ImportError("This training script is meant to be run directly " + "and cannot be imported. Please execute it as a standalone script.") diff --git a/CODE/_dev.py b/CODE/_dev.py index 47687b6..18755ea 100644 --- a/CODE/_dev.py +++ b/CODE/_dev.py @@ -66,7 +66,7 @@ def dev_checks() -> None: Performs a series of checks to ensure that the developer has followed the required guidelines and best practices. Returns: bool: True if all checks pass, otherwise False. - """ + """ # Create the necessary directories if they do not exist FileManagement.mkdir() @@ -88,9 +88,9 @@ def dev_checks() -> None: # Get the list of code files in the current directory files = Get.list_of_code_files(".") - added_files = [f for f in files if f not in CURRENT_FILES] - removed_files = [f for f in CURRENT_FILES if f not in files] - normal_files = [f for f in files if f in CURRENT_FILES] + added_files = [f.replace('"', '') for f in files if f not in CURRENT_FILES] + removed_files = [f.replace('"', '') for f in CURRENT_FILES if f not in files] + normal_files = [f.replace('"', '') for f in files if f in CURRENT_FILES] # Print the list of added, removed, and normal files in color print("\n".join([f"\033[92m+ {file}\033[0m" for file in added_files])) # Green + diff --git a/CODE/config.ini b/CODE/config.ini index f24190c..65a2e6d 100644 --- a/CODE/config.ini +++ b/CODE/config.ini @@ -9,8 +9,8 @@ delete_old_logs = false [System Settings] # Do not play with these settings unless you know what you are doing -version = 3.0.0 -files = "browser_miner.ps1, cmd_commands.py, dir_list.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, _debug.py, _dev.py, _extra.py, logicytics\Checks.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\__init__.py, VulnScan\tools\_test_gpu_acceleration.py, VulnScan\tools\_vectorizer.py, VulnScan\v2-deprecated\_generate_data.py, VulnScan\v2-deprecated\_train.py, VulnScan\v3\_generate_data.py, VulnScan\v3\_train.py" +version = 3.1.0 +files = "browser_miner.ps1, cmd_commands.py, dir_list.py, dump_memory.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, _debug.py, _dev.py, _extra.py, logicytics\Checks.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\__init__.py, VulnScan\tools\_study_network.py, VulnScan\tools\_test_gpu_acceleration.py, VulnScan\tools\_vectorizer.py, VulnScan\v2-deprecated\_generate_data.py, VulnScan\v2-deprecated\_train.py, VulnScan\v3\_generate_data.py, VulnScan\v3\_train.py" ################################################### # The following settings are for specific modules # @@ -27,29 +27,10 @@ timeout = 10 ################################################### -[VulnScan.train Settings] -# The following settings are for the Train module for training models -# NeuralNetwork seems to be the best choice for this task -# Options: "NeuralNetwork", "LogReg", -# "RandomForest", "ExtraTrees", "GBM", -# "XGBoost", "DecisionTree", "NaiveBayes" -model_name = NeuralNetwork -# General Training Parameters -epochs = 10 -batch_size = 32 -learning_rate = 0.001 -use_cuda = true - -# Paths to train and save data -train_data_path = C:\Users\Hp\Desktop\Model Tests\Model Data\GeneratedData -# If all models are to be trained, this is the path to save all models, -# and will be appended with the model codename and follow naming convention -save_model_path = C:\Users\Hp\Desktop\Model Tests\Model SenseMini - [VulnScan.generate Settings] # The following settings are for the Generate module for fake training data extensions = .txt, .log, .md, .csv, .json, .xml, .html, .yaml, .ini, .pdf, .docx, .xlsx, .pptx -save_path = C:\Users\Hp\Desktop\Model Tests\Generated Data +save_path = PATH # Options include: # 'Sense' - Generates 50k files, each 25KB in size. # 'SenseNano' - Generates 5 files, each 5KB in size. @@ -79,11 +60,44 @@ partial_sensitive_chance = 0.2 # Use the vectorizer supplied for any v3 model on SenseMini # The path to the data to vectorize, either a file or a directory -data_path = C:\Users\Hp\Desktop\Model Tests\Model Data\GeneratedData +data_path = PATH # The path to save the vectorized data - It will automatically be appended '\Vectorizer.pkl' # Make sure the path is a directory, and it exists -output_path = C:\Users\Hp\Desktop\Model Tests\Model Sense - Vectorizer +output_path = PATH # Vectorizer to use, options include: # tfidf or count - The code for the training only supports tfidf - we advise to use tfidf vectorizer_type = tfidf + +[VulnScan.train Settings] +# The following settings are for the Train module for training models +# NeuralNetwork seems to be the best choice for this task +# Options: "NeuralNetwork", "LogReg", +# "RandomForest", "ExtraTrees", "GBM", +# "XGBoost", "DecisionTree", "NaiveBayes" +model_name = NeuralNetwork +# General Training Parameters +epochs = 10 +batch_size = 32 +learning_rate = 0.001 +use_cuda = true + +# Paths to train and save data +train_data_path = PATH +# If all models are to be trained, this is the path to save all models, +# and will be appended with the model codename and follow naming convention +save_model_path = PATH + +[VulnScan.study Settings] +# Here is the basics of the study module +# This is useful to generate graphs and data that may help in understanding the model +# Everything is found online pre-studied, so this is not necessary +# But it is useful for understanding the model locally +# All files be saved here, and can't be changed, PATH is "NN features/" + +# This is the path to the model, and the vectorizer +model_path = PATH +vectorizer_path = PATH +# Number of features to visualise in the SVG Bar graph, maximum is 3000 due to limitations +# Placing -1 will visualise first 3000 features. Bar will be a color gradient heatmap. +number_of_features = -1 diff --git a/CODE/dump_memory.py b/CODE/dump_memory.py new file mode 100644 index 0000000..927c40d --- /dev/null +++ b/CODE/dump_memory.py @@ -0,0 +1,218 @@ +import datetime +import platform +import ctypes +import os +import psutil +from logicytics import Log, DEBUG + +if __name__ == "__main__": + log = Log({"log_level": DEBUG}) + # Constants + PROCESS_QUERY_INFORMATION = 0x0400 + PROCESS_VM_READ = 0x0010 + MEM_COMMIT = 0x1000 + PAGE_READWRITE = 0x04 + + +# Function to save RAM content snapshot to a file +@log.function +def dump_ram_content(): + """ + Capture the current state of the system's RAM and write it to a file. + + This function gathers memory statistics, system-specific details, and writes + the information to a file named 'Ram_Snapshot.txt'. + """ + try: + # Generate a timestamp for the file + dump_file = "Ram_Snapshot.txt" + + # Gather memory statistics using psutil + memory_info = psutil.virtual_memory() + swap_info = psutil.swap_memory() + + # Get system-specific details + system_info = ( + "System Information:\n" + "===================================\n" + f"OS: {platform.system()} {platform.release()}\n" + f"Architecture: {platform.architecture()[0]}\n" + f"Processor: {platform.processor()}\n" + f"Machine: {platform.machine()}\n\n" + ) + + # Prepare content to dump + dump_content = ( + f"RAM Snapshot - {datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}\n" + "===================================\n" + f"{system_info}" + f"Total Memory: {memory_info.total / (1024 ** 3):.2f} GB\n" + f"Available Memory: {memory_info.available / (1024 ** 3):.2f} GB\n" + f"Used Memory: {memory_info.used / (1024 ** 3):.2f} GB\n" + f"Memory Usage: {memory_info.percent}%\n\n" + f"Swap Total: {swap_info.total / (1024 ** 3):.2f} GB\n" + f"Swap Used: {swap_info.used / (1024 ** 3):.2f} GB\n" + f"Swap Free: {swap_info.free / (1024 ** 3):.2f} GB\n" + f"Swap Usage: {swap_info.percent}%\n" + ) + + # Write the content to the file + with open(dump_file, "w", encoding="utf-8") as file: + file.write(dump_content) + + log.info(f"RAM snapshot saved to: {dump_file}") + + except Exception as e: + log.error(f"Error capturing RAM snapshot: {e}") + + +# Define structures for SystemInfo +class SystemInfo(ctypes.Structure): + # noinspection PyUnresolvedReferences + """ + A ctypes Structure to hold system information. + + Attributes: + wProcessorArchitecture (ctypes.c_ushort): Processor architecture. + wReserved (ctypes.c_ushort): Reserved. + dwPageSize (ctypes.c_ulong): Page size. + lpMinimumApplicationAddress (ctypes.c_void_p): Minimum application address. + lpMaximumApplicationAddress (ctypes.c_void_p): Maximum application address. + dwActiveProcessorMask (ctypes.POINTER(ctypes.c_ulong)): Active processor mask. + dwNumberOfProcessors (ctypes.c_ulong): Number of processors. + dwProcessorType (ctypes.c_ulong): Processor type. + dwAllocationGranularity (ctypes.c_ulong): Allocation granularity. + wProcessorLevel (ctypes.c_ushort): Processor level. + wProcessorRevision (ctypes.c_ushort): Processor revision. + """ + _fields_ = [ + ("wProcessorArchitecture", ctypes.c_ushort), + ("wReserved", ctypes.c_ushort), + ("dwPageSize", ctypes.c_ulong), + ("lpMinimumApplicationAddress", ctypes.c_void_p), + ("lpMaximumApplicationAddress", ctypes.c_void_p), + ("dwActiveProcessorMask", ctypes.POINTER(ctypes.c_ulong)), + ("dwNumberOfProcessors", ctypes.c_ulong), + ("dwProcessorType", ctypes.c_ulong), + ("dwAllocationGranularity", ctypes.c_ulong), + ("wProcessorLevel", ctypes.c_ushort), + ("wProcessorRevision", ctypes.c_ushort), + ] + + +# Define BasicMemInfo +class BasicMemInfo(ctypes.Structure): + # noinspection PyUnresolvedReferences + """ + A ctypes Structure to hold basic memory information. + + Attributes: + BaseAddress (ctypes.c_void_p): Base address. + AllocationBase (ctypes.c_void_p): Allocation base. + AllocationProtect (ctypes.c_ulong): Allocation protection. + RegionSize (ctypes.c_size_t): Region size. + State (ctypes.c_ulong): State. + Protect (ctypes.c_ulong): Protection. + Type (ctypes.c_ulong): Type. + """ + _fields_ = [ + ("BaseAddress", ctypes.c_void_p), + ("AllocationBase", ctypes.c_void_p), + ("AllocationProtect", ctypes.c_ulong), + ("RegionSize", ctypes.c_size_t), + ("State", ctypes.c_ulong), + ("Protect", ctypes.c_ulong), + ("Type", ctypes.c_ulong), + ] + + +@log.function +def get_system_info() -> SystemInfo: + """ + Retrieve and return system information using the `GetSystemInfo` function from the Windows API. + + Returns: + SystemInfo: A `SystemInfo` structure containing details about the system's architecture, + processor, memory, and other attributes. + """ + system_info = SystemInfo() + ctypes.windll.kernel32.GetSystemInfo(ctypes.byref(system_info)) + return system_info + + +@log.function +def read_memory(): + """ + Read the memory of the current process and write the content to a file. + + This function opens the current process with the necessary permissions, + retrieves system information, and iterates through memory pages to read + """ + # Open current process with permissions + process = ctypes.windll.kernel32.OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, os.getpid()) + if not process: + log.error("Unable to open process for reading.") + return + + # Get system info + system_info = get_system_info() + min_address = system_info.lpMinimumApplicationAddress + max_address = system_info.lpMaximumApplicationAddress + with open("SystemRam_Info.txt", "w") as sys_file: + sys_file.write("System Information:\n") + sys_file.write("===================================\n") + sys_file.write(f"Minimum Address: {min_address}\n") + sys_file.write(f"Maximum Address: {max_address}\n") + sys_file.write(f"Allocation Granularity: {system_info.dwAllocationGranularity}\n") + sys_file.write(f"Processor Architecture: {system_info.wProcessorArchitecture}\n") + sys_file.write(f"Number of Processors: {system_info.dwNumberOfProcessors}\n") + sys_file.write(f"Processor Type: {system_info.dwProcessorType}\n") + sys_file.write(f"Processor Level: {system_info.wProcessorLevel}\n") + sys_file.write(f"Processor Revision: {system_info.wProcessorRevision}\n") + sys_file.write(f"Page Size: {system_info.dwPageSize}\n") + sys_file.write(f"Active Processor Mask: {system_info.dwActiveProcessorMask.contents}\n") + sys_file.write(f"Reserved: {system_info.wReserved}\n") + sys_file.write("===================================\n") + sys_file.write(f"Raw SystemInfo: {system_info}\n") + sys_file.write("===================================\n") + log.debug(f"Memory Range: {min_address:#x} - {max_address:#x}") + + # Iterate through memory pages + memory_info = BasicMemInfo() + address = min_address + with open("Ram_Dump.txt", "w") as dump_file: + while address < max_address: + result = ctypes.windll.kernel32.VirtualQueryEx( + process, ctypes.c_void_p(address), ctypes.byref(memory_info), ctypes.sizeof(memory_info) + ) + if not result: + break + + # Check if the memory is committed and readable + if memory_info.State == MEM_COMMIT and memory_info.Protect == PAGE_READWRITE: + buffer = ctypes.create_string_buffer(memory_info.RegionSize) + bytes_read = ctypes.c_size_t() + ctypes.windll.kernel32.ReadProcessMemory( + process, + ctypes.c_void_p(memory_info.BaseAddress), + buffer, + memory_info.RegionSize, + ctypes.byref(bytes_read), + ) + dump_file.write(str(buffer.raw[: bytes_read.value])) + + address += memory_info.RegionSize + + # Close the process handle + ctypes.windll.kernel32.CloseHandle(process) + log.info("Memory dump complete. Saved to 'ram_dump.txt'.") + log.warning("Encoding is in HEX") + + +if __name__ == "__main__": + try: + log.info("Starting memory dump process...") + dump_ram_content() + read_memory() + except Exception as err: + log.error(f"Error during memory dump: {err}") diff --git a/CODE/logicytics/FileManagement.py b/CODE/logicytics/FileManagement.py index 188b134..07f9fc3 100644 --- a/CODE/logicytics/FileManagement.py +++ b/CODE/logicytics/FileManagement.py @@ -107,7 +107,9 @@ def __get_files_to_zip(path: str) -> list: list: A list of file and directory names to be zipped. """ excluded_extensions = (".py", ".exe", ".bat", ".ps1", ".pkl", ".pth") - excluded_prefixes = ("config.ini", "SysInternal_Suite", "__pycache__", "logicytics", "VulnScan") + excluded_prefixes = ("config.ini", "SysInternal_Suite", + "__pycache__", "logicytics", "VulnScan", + "Vectorizer features") return [ f for f in os.listdir(path) diff --git a/CODE/vulnscan.py b/CODE/vulnscan.py index 6d9ec78..e9cf5fe 100644 --- a/CODE/vulnscan.py +++ b/CODE/vulnscan.py @@ -6,20 +6,23 @@ import warnings import joblib +import numpy as np import torch from safetensors import safe_open from sklearn.feature_extraction.text import TfidfVectorizer -from tqdm import tqdm # Set up logging from logicytics import Log, DEBUG -# Use v3 models on this! Especially NN models - if __name__ == "__main__": - log = Log( - {"log_level": DEBUG} - ) + log = Log({"log_level": DEBUG}) + +log.info("Locking threads - Model and Vectorizer") +model_lock = threading.Lock() +vectorizer_lock = threading.Lock() + +model_to_use = None +vectorizer_to_use = None def load_model(model_path_to_load: str) -> safe_open | torch.nn.Module: @@ -42,12 +45,28 @@ def load_model(model_path_to_load: str) -> safe_open | torch.nn.Module: elif model_path_to_load.endswith('.pth'): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=FutureWarning) - return torch.load(model_path_to_load) + return torch.load(model_path_to_load, weights_only=False) else: raise ValueError("Unsupported model file format. Use .pkl, .safetensors, or .pth") -def is_sensitive(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_content: str) -> tuple[bool, float]: +def scan_path(model_path: str, scan_paths: str, vectorizer_path: str): + global model_to_use, vectorizer_to_use + try: + with model_lock: + if model_to_use is None: + log.info(f"Loading model from {model_path}") + model_to_use = load_model(model_path) + with vectorizer_lock: + if vectorizer_to_use is None: + log.info(f"Loading vectorizer from {vectorizer_path}") + vectorizer_to_use = joblib.load(vectorizer_path) + vulnscan(model_to_use, scan_paths, vectorizer_to_use) + except Exception as e: + log.error(f"Error scanning path {scan_paths}: {e}") + + +def is_sensitive(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_content: str) -> tuple[bool, float, str]: """ Determine if the file content is sensitive using the provided model and vectorizer. @@ -57,7 +76,7 @@ def is_sensitive(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_conte file_content (str): Content of the file to be analyzed. Returns: - tuple: (True if the content is sensitive, False otherwise, prediction probability). + tuple: (True if the content is sensitive, False otherwise, prediction probability, reason). """ if isinstance(model, torch.nn.Module): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") @@ -68,15 +87,19 @@ def is_sensitive(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_conte features_tensor = torch.tensor(features.toarray(), dtype=torch.float32).to(device) prediction = model(features_tensor) probability = torch.softmax(prediction, dim=1).max().item() - return prediction.argmax(dim=1).item() == 1, probability + top_features = np.argsort(features.toarray()[0])[-5:] + reason = ", ".join([vectorizer.get_feature_names_out()[i] for i in top_features]) + return prediction.argmax(dim=1).item() == 1, probability, reason else: features = vectorizer.transform([file_content]) prediction = model.predict_proba(features) probability = prediction.max() - return model.predict(features)[0] == 1, probability + top_features = np.argsort(features.toarray()[0])[-5:] + reason = ", ".join([vectorizer.get_feature_names_out()[i] for i in top_features]) + return model.predict(features)[0] == 1, probability, reason -def scan_file(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_path: str) -> tuple[bool, float]: +def scan_file(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_path: str) -> tuple[bool, float, str]: """ Scan a single file to determine if it contains sensitive content. @@ -99,83 +122,38 @@ def scan_file(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_path: st return is_sensitive(model, vectorizer, content) -def scan_directory(model: torch.nn.Module, vectorizer, dir_path: str) -> dict[str, tuple[bool, float]]: - """ - Scan all files in a directory to determine if they contain sensitive content. - - Args: - model: Machine learning model. - vectorizer: Vectorizer to transform file content. - dir_path (str): Path to the directory to be scanned. - - Returns: - dict: Dictionary with file paths as keys and (sensitivity, prediction probability) as values. - """ - results = {} - for roots, _, files_dir in os.walk(dir_path): - for file in tqdm(files_dir, desc="Scanning files", unit="file", leave=True): - file_path = os.path.join(roots, file) - if file.endswith(('.zip', '.rar', '.7z', '.tar', '.gz', '.tar.gz')): - continue - results[file_path] = scan_file(model, vectorizer, file_path) - - return results - - -def main(MODELS_PATH: str, SCAN_PATH: str, VECTORIZER_PATH: str): - """ - Main function to load the model and vectorizer, and scan the specified path. - Saves the paths of sensitive files to a file named "Sensitive_File_Paths.txt". - - Args: - MODELS_PATH (str): Path to the model file. - SCAN_PATH (str): Path to the file or directory to be scanned. - VECTORIZER_PATH (str): Path to the vectorizer file. - """ - log.info(f"Loading model from {MODELS_PATH}") - model = load_model(MODELS_PATH) - log.info(f"Loading vectorizer from {VECTORIZER_PATH}") - vectorizer = joblib.load(VECTORIZER_PATH) # Adjust as needed +def vulnscan(model, SCAN_PATH, vectorizer): log.info(f"Scanning {SCAN_PATH}") - if os.path.isfile(SCAN_PATH): - result, probability = scan_file(model, vectorizer, SCAN_PATH) - log.info(f"File {SCAN_PATH} is {'sensitive' if result else 'not sensitive'} with probability {probability:.2f}") - with open("Sensitive_File_Paths.txt", "w") as sensitive_file: + result, probability, reason = scan_file(model, vectorizer, SCAN_PATH) + if result: + log.info(f"File {SCAN_PATH} is sensitive with probability {probability:.2f}. Reason: {reason}") + if not os.path.exists("Sensitive_File_Paths.txt"): + with open("Sensitive_File_Paths.txt", "w") as sensitive_file: + sensitive_file.write(f"{SCAN_PATH}\n\n") + with open("Sensitive_File_Paths.txt", "a") as sensitive_file: sensitive_file.write(f"{SCAN_PATH}\n") - elif os.path.isdir(SCAN_PATH): - results = scan_directory(model, vectorizer, SCAN_PATH) - with open("Sensitive_File_Paths.txt", "w") as sensitive_file: - for file_path, (is_sensitive_main, probability) in results.items(): - log.info(f"File {file_path} is {'sensitive' if is_sensitive_main else 'not sensitive'} with probability {probability:.2f}") - if is_sensitive_main: - sensitive_file.write(f"{file_path}\n") - else: - log.error("Invalid path provided. Please provide a valid file or directory path.") - exit(1) -def scan_path(model_path: str, scan_paths: str, vectorizer_path: str): - """ - Scan the specified path using the provided model and vectorizer. - - Args: - model_path (str): Path to the model file. - scan_paths (str): Path to the file or directory to be scanned. - vectorizer_path (str): Path to the vectorizer file. - """ - main(model_path, scan_paths, vectorizer_path) - - -log.warning("Starting scan - This may take hours!!") +# Start scanning +log.info("Getting paths to scan - This may take some time!!") threads = [] -paths = [ +paths = [] +base_paths = [ "C:\\Users\\", "C:\\Windows\\Logs", "C:\\Program Files", "C:\\Program Files (x86)" ] +for base_path in base_paths: + for root, dirs, files_main in os.walk(base_path): + for file_main in files_main: + paths.append(os.path.join(root, file_main)) + +# Start scanning +log.warning("Starting scan - This may take hours and consume memory!!") + for path in paths: thread = threading.Thread(target=scan_path, args=("VulnScan/Model SenseMini .3n3.pth", path, "VulnScan/Vectorizer .3n3.pkl")) diff --git a/PLANS.md b/PLANS.md index 39cc6f5..cf9a1db 100644 --- a/PLANS.md +++ b/PLANS.md @@ -5,12 +5,13 @@ > - ❌ = Might be done, Not sure yet > - ✅ = Will be done, 100% sure -| Task | Version | Might or Will be done? | -|---------------------------------------------------------------------------------------------------------------------------------|---------|------------------------| -| Add a tool to capture and analyse memory dumps, which can help in forensic investigations. | v3.1.0 | ❌ | -| Add a tool to capture and analyse network traffic, which can help in forensic investigations. | v3.1.0 | ❌ | -| Remove EXTRA dir, and zip features with custom proper features from Logicytics, as well as remove EXTRA wrapper | v3.2.0 | ❌ | -| Implement a parser for Windows Prefetch files, Shellbags, Jump Lists, LNK files to extract data | v3.3a.0 | ✅ | -| Implement a parser for Windows UserAssist registry key, SRUM database to extract data. | v3.3b.0 | ✅ | -| Implement a parser for Windows Volume Shadow Copy, LSA Secrets, Syscache, Shimcache, Amcache Event Tracing logs to extract data | v3.3c.0 | ✅ | -| Implement the 2 missing flags | v3.4.0 | ✅ | +| Task | Version | Might or Will be done? | +|---------------------------------------------------------------------------------------------------------------------------------|----------------|------------------------| +| Remove EXTRA dir, and zip features with custom proper features from Logicytics, as well as remove EXTRA wrapper | v3.2.0 | ❌ | +| Remove deprecated feature: `_train.py` | v3.2.0 | ❌ | +| Implement a parser for Windows Prefetch files, Shellbags, Jump Lists, LNK files to extract data | snapshot-3.3.a | ✅ | +| Implement a parser for Windows UserAssist registry key, SRUM database to extract data. | snapshot-3.3.b | ✅ | +| Implement a parser for Windows Volume Shadow Copy, LSA Secrets, Syscache, Shimcache, Amcache Event Tracing logs to extract data | snapshot-3.3.c | ✅ | +| Implement the 2 missing flags | v3.4.0 | ✅ | +| Remove deprecated feature: `_generate_data.py` | v3.4.0 | ✅ | +| Move VulnScan tools and v3 module to separate repository, keep only the model and vectorizer | v3.5.0 | ✅ | diff --git a/README.md b/README.md index d1597ac..9d6f495 100644 --- a/README.md +++ b/README.md @@ -282,26 +282,27 @@ Here are some of the data points that Logicytics extracts: > [!TIP] > You can check out future plans [here](PLANS.md), you can contribute these plans if you have no idea's on what to contribute! -| File Name | About | Important Note | -|--------------------------|------------------------------------------------------------------------------------------------------------------------|---------------------------------| -| browser_miner.ps1 | Mines all data related to browsers | Would love to be updated | -| cmd_commands.py | Gets data from driverquery, sysinfo, gpresult and more | | -| log_miner.py | Gets all logs from the Windows device | | -| media_backup.py | Gets all media of the device in a neat folder | Would love to be updated | -| netadapter.ps1 | Runs Get-NetAdapter Command with many flags | | -| property_scraper.ps1 | Gets all the windows properties | | -| registry.py | Backups the registry | | -| sensitive_data_miner.py | Copies all files that can be considered sensitive in a neat folder, , very slow and clunky - useful for depth scanning | | -| ssh_miner.py | Gets as much ssh private data as possible | | -| sys_internal.py | Attempts to use the Sys_Internal Suite from microsoft | | -| tasklist.py | Gets all running tasks, PID and info/data | | -| tree.ps1 | Runs and logs the tree.ps1 command, very slow and clunky - useful for depth scanning | | -| window_feature_miner.ps1 | Logs all the windows features enabled | | -| wmic.py | Logs and runs many wmic commands to gain sensitive data and information | | -| wifi_stealer.py | Gets the SSID and Password of all saved Wi-Fi | | -| dir_list.py | Produces a txt on every single file on the device, very slow and clunky - useful for depth scanning | | -| event_logs.py | Produces a multiple txt files in a folder on many event logs (Security, Applications and System) | | -| vulnscan.py | Uses AI/ML to detect sensitive files, and log their paths | In beta! | +| File Name | About | Important Note | +|--------------------------|----------------------------------------------------------------------------------------------------------------------|--------------------------| +| browser_miner.ps1 | Mines all data related to browsers | | +| cmd_commands.py | Gets data from driverquery, sysinfo, gpresult and more | | +| log_miner.py | Gets all logs from the Windows device | | +| media_backup.py | Gets all media of the device in a neat folder | Would love to be updated | +| netadapter.ps1 | Runs Get-NetAdapter Command with many flags | | +| property_scraper.ps1 | Gets all the windows properties | | +| registry.py | Backups the registry | | +| sensitive_data_miner.py | Copies all files that can be considered sensitive in a neat folder, very slow and clunky - useful for depth scanning | | +| ssh_miner.py | Gets as much ssh private data as possible | | +| sys_internal.py | Attempts to use the Sys_Internal Suite from microsoft | | +| tasklist.py | Gets all running tasks, PID and info/data | | +| tree.ps1 | Runs and logs the tree.ps1 command, very slow and clunky - useful for depth scanning | | +| window_feature_miner.ps1 | Logs all the windows features enabled | | +| wmic.py | Logs and runs many wmic commands to gain sensitive data and information | | +| wifi_stealer.py | Gets the SSID and Password of all saved Wi-Fi | | +| dir_list.py | Produces a txt on every single file on the device, very slow and clunky - useful for depth scanning | | +| event_logs.py | Produces a multiple txt files in a folder on many event logs (Security, Applications and System) | | +| vulnscan.py | Uses AI/ML to detect sensitive files, and log their paths | In beta! | +| dump_memory.py | Dumps some memory as well as log some RAM details | | This is not an exhaustive list, but it should give you a good idea of what data Logicytics is capable of extracting. diff --git a/requirements.txt b/requirements.txt index ce546ac..137a2a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,19 +1,23 @@ +configobj~=5.0.9 +joblib~=1.3.2 +matplotlib~=3.8.4 +torch~=2.5.1+cu124 +xgboost~=2.1.3 +scikit-learn~=1.5.2 +Faker~=30.3.0 +numpy~=1.26.4 +transformers~=4.38.2 requests~=2.32.3 psutil~=6.1.0 -colorlog~=6.9.0 DateTime~=5.5 pathlib~=1.0.1 +colorlog~=6.9.0 +safetensors~=0.4.5 prettytable~=3.12.0 -scikit-learn~=1.5.2 -joblib~=1.3.2 -matplotlib~=3.8.4 -numpy~=1.26.4 -Faker~=30.3.0 -transformers~=4.38.2 -xgboost~=2.1.3 pandas~=2.2.2 networkx~=3.2.1 scapy~=2.5.0 -safetensors~=0.4.2 +seaborn~=0.13.2 +torchviz~=0.0.3 +plotly~=5.24.1 tqdm~=4.66.6 -configobj~=5.0.9