Skip to content

Commit

Permalink
Moving dataspace file migration to the background (#1386)
Browse files Browse the repository at this point in the history
Change errors to be marked in the snapshot instead of being raised

Still need to refactor the last set of errors not to be raised (They are also untested)

refs #1372
  • Loading branch information
carolyncole authored Aug 1, 2023
1 parent a1ce6ec commit 3cbb67c
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 68 deletions.
101 changes: 101 additions & 0 deletions app/jobs/dspace_bitstream_copy_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# frozen_string_literal: true
class DspaceBitstreamCopyJob < ApplicationJob
queue_as :default

def perform(dspace_files_json:, work_id:, migration_snapshot_id:)
dspace_files = JSON.parse(dspace_files_json).map { |json_file| S3File.from_json(json_file) }
work = Work.find(work_id)

dspace_files = download_dspace_bitstreams(dspace_files, work, migration_snapshot_id)
upload_dspace_files(dspace_files, work, migration_snapshot_id)

dspace_files.each { |file| File.delete(file.filename) }
end

private

def download_dspace_bitstreams(dspace_files, work, migration_snapshot_id)
dspace_connector = PULDspaceConnector.new(work)

downloaded_files = dspace_connector.download_bitstreams(dspace_files)
if downloaded_files.any?(Hash)
error_files = downloaded_files.select { |file| file.is_a? Hash }
update_migration_status(migration_snapshot_id) do |migration_snapshot|
error_files.each do |error_file|
migration_snapshot.mark_error(error_file[:file], error_file[:error])
end
end
dspace_files = downloaded_files.reject { |file| file.is_a? Hash }
end
dspace_files
end

def upload_dspace_files(dspace_files, work, migration_snapshot_id)
aws_connector = PULDspaceAwsConnector.new(work, work.doi)

results = aws_connector.upload_to_s3(dspace_files)
error_results = results.select { |result| result[:error].present? }
good_results = results.select { |result| result[:key].present? }
update_migration_status(migration_snapshot_id) do |migration_snapshot|
good_results.each do |result|
migration_snapshot.mark_complete(result[:file])
end
error_results.each do |result|
migration_snapshot.mark_error(result[:file], result[:error])
end
end
@migration_snapshot&.save
error_results.map { |result| result[:error] }
end

def update_migration_status(migration_snapshot_id)
migration_snapshot = MigrationUploadSnapshot.find(migration_snapshot_id)
migration_snapshot.with_lock do
migration_snapshot.reload
yield migration_snapshot
migration_snapshot.save!
end
end

def download_bitstream(retrieval_path, filename)
url = "#{Rails.configuration.dspace.base_url}#{retrieval_path}"
path = File.join(Rails.configuration.dspace.download_file_path, "dspace_download", work.id.to_s)
FileUtils.mkdir_p path
download_file(url, filename)
filename
end

def download_file(url, filename)
http = request_http(url)
uri = URI(url)
req = Net::HTTP::Get.new uri.path
http.request req do |response|
io = File.open(filename, "w")
response.read_body do |chunk|
io.write chunk.force_encoding("UTF-8")
end
io.close
end
end

def checksum_file(filename, original_checksum)
checksum = Digest::MD5.file(filename)
base64 = checksum.base64digest
if base64 != original_checksum
msg = "Mismatching checksum #{filename} #{original_checksum} for work: #{work.id} doi: #{work.doi} ark: #{work.ark}"
Rails.logger.error msg
Honeybadger.notify(msg)
false
else
Rails.logger.debug "Matching checksums for #{filename}"
true
end
end

def request_http(url)
uri = URI(url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true
http
end
end
31 changes: 26 additions & 5 deletions app/models/migration_upload_snapshot.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,31 @@ def store_files(s3_files, pre_existing_files: [])
files.concat pre_existing_files if pre_existing_files.present?
end

def mark_error(s3_file, error_message)
index = find_file(s3_file.filename_display)
if index.present?
files[index]["migrate_status"] = "error"
files[index]["migrate_error"] = error_message
end
end

def mark_complete(s3_file)
index = files.index { |file| file["filename"] == s3_file.filename && file["migrate_status"] == "started" }
if index.nil?
Honeybadger.notify("Migrated a file that was not part of the orginal Migration: #{id} for work #{work_id}: #{s3_file.filename}")
else
index = find_file(s3_file.filename)
if index.present?
files[index]["migrate_status"] = "complete"
finalize_migration if migration_complete?
end
finalize_migration if migration_complete?
end

def migration_complete?
files.select { |file| file.keys.include?("migrate_status") }.map { |file| file["migrate_status"] }.uniq == ["complete"]
end

def migration_complete_with_errors?
return false if migration_complete?
!files.select { |file| file.keys.include?("migrate_status") }.map { |file| file["migrate_status"] }.uniq.include?("started")
end

def existing_files
super.select { |file| file["migrate_status"].nil? || file["migrate_status"] == "complete" }
end
Expand All @@ -37,4 +48,14 @@ def finalize_migration
migration.created_by_user_id, activity_type: WorkActivity::MIGRATION_COMPLETE)
end
end

private

def find_file(filename)
index = files.index { |file| file["filename"] == filename && file["migrate_status"] == "started" }
if index.nil?
Honeybadger.notify("Migrated a file that was not part of the orginal Migration: #{id} for work #{work_id}: #{filename}")
end
index
end
end
24 changes: 21 additions & 3 deletions app/models/s3_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,27 @@ class S3File
alias key filename
alias id filename

def initialize(filename:, last_modified:, size:, checksum:, work:)
def self.from_json(json_string)
json = if json_string.is_a? String
JSON.parse(json_string)
else
json_string
end

new(filename: json["filename"], last_modified: DateTime.parse(json["last_modified"]), size: json["size"],
checksum: json["checksum"], work: Work.find(json["work_id"]), filename_display: json["filename_display"], url: json["url"])
end

def initialize(filename:, last_modified:, size:, checksum:, work:, filename_display: nil, url: nil)
@safe_id = filename_as_id(filename)
@filename = filename
@filename_display = filename_short(work, filename)
@filename_display = filename_display || filename_short(work, filename)
@last_modified = last_modified
@last_modified_display = last_modified.in_time_zone.strftime("%m/%d/%Y %I:%M %p") # mm/dd/YYYY HH:MM AM
@size = size
@display_size = number_to_human_size(size)
@checksum = checksum.delete('"')
@url = work_download_path(work, filename: filename)
@url = url || work_download_path(work, filename: filename)
@work = work
end

Expand Down Expand Up @@ -81,6 +92,13 @@ def snapshots
persisted
end

def to_json
{
filename: filename, last_modified: last_modified, size: size,
checksum: checksum, work_id: @work.id, filename_display: filename_display, url: url
}.to_json
end

private

# Filename without the DOI/work-id/ in the path (but we preserve other path information if there is any)
Expand Down
49 changes: 30 additions & 19 deletions app/services/pul_dspace_connector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,27 @@ def metadata
end
end

def download_bitstreams
bitstreams.map do |bitstream|
filename = download_bitstream(bitstream["retrieveLink"], bitstream["name"])
if checksum_file(filename, bitstream)
S3File.new(filename: filename, checksum: bitstream["checkSum"]["base64"], last_modified: DateTime.now, size: -1, work: work)
def list_bitsteams
@list_bitsteams ||=
bitstreams.map do |bitstream|
path = File.join(Rails.configuration.dspace.download_file_path, "dspace_download", work.id.to_s)
filename = File.join(path, bitstream["name"])
if bitstream["checkSum"]["checkSumAlgorithm"] != "MD5"
Honeybadger.notify("Unknown checksum algorithm #{bitstream['checkSum']['checkSumAlgorithm']} #{filename} #{bitstream}")
end

S3File.new(filename_display: bitstream["name"], checksum: base64digest(bitstream["checkSum"]["value"]), last_modified: DateTime.now,
size: -1, work: work, url: bitstream["retrieveLink"], filename: filename)
end
end

def download_bitstreams(bitstream_list)
bitstream_list.map do |file|
filename = download_bitstream(file.url, file.filename)
if checksum_file(filename, file.checksum)
file
else
{ file: file, error: "Checsum Missmatch" }
end
end
end
Expand Down Expand Up @@ -65,10 +81,9 @@ def get_data(url_path)
JSON.parse(response.body)
end

def download_bitstream(retrieval_path, name)
def download_bitstream(retrieval_path, filename)
url = "#{Rails.configuration.dspace.base_url}#{retrieval_path}"
path = File.join(Rails.configuration.dspace.download_file_path, "dspace_download", work.id.to_s)
filename = File.join(path, name)
FileUtils.mkdir_p path
download_file(url, filename)
filename
Expand All @@ -87,27 +102,23 @@ def download_file(url, filename)
end
end

# rubocop:disable Metrics/MethodLength
def checksum_file(filename, bitstream)
checksum_class = Digest.const_get(bitstream["checkSum"]["checkSumAlgorithm"])
checksum = checksum_class.file(filename)
hexdigest = checksum.hexdigest
def checksum_file(filename, original_checksum)
checksum = Digest::MD5.file(filename)
base64 = checksum.base64digest
bitstream["checkSum"]["base64"] = base64
if hexdigest != bitstream["checkSum"]["value"]
msg = "Mismatching checksum #{filename} #{bitstream} for work: #{work.id} doi: #{work.doi} ark: #{work.ark}"
if base64 != original_checksum
msg = "Mismatching checksum #{filename} #{original_checksum} for work: #{work.id} doi: #{work.doi} ark: #{work.ark}"
Rails.logger.error msg
Honeybadger.notify(msg)
false
else
Rails.logger.debug "Matching checksums for #{filename}"
true
end
rescue NameError
Honeybadger.notify("Unknown checksum algorithm #{bitstream['checkSum']['checkSumAlgorithm']} #{filename} #{bitstream}")
false
end
# rubocop:enable Metrics/MethodLength

def base64digest(hexdigest)
[[hexdigest].pack("H*")].pack("m0")
end

def request_http(url)
uri = URI(url)
Expand Down
31 changes: 5 additions & 26 deletions app/services/pul_dspace_migrate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ def remove_overlap_and_combine
end
end
@dspace_files = dspace_files - files_to_remove
@dspace_files.each { |file| file_keys << file.filename_display }
dspace_files + aws_files_only
end

def dpsace_update_display_to_final_key
dspace_files.each do |s3_file|
s3_file.filename_display = work.s3_query_service.prefix + File.basename(s3_file.filename)
s3_file.filename_display = work.s3_query_service.prefix + s3_file.filename_display
end
end

Expand All @@ -99,32 +100,10 @@ def check_matching_files(aws_file, dpace_file, files_to_remove)

def migrate_dspace
return if work.skip_dataspace_migration?
@dspace_files = dspace_connector.download_bitstreams
if dspace_files.any?(nil)
bitstreams = dspace_connector.bitstreams
error_files = dspace_files.zip(bitstreams).select { |values| values.first.nil? }.map(&:last)
error_names = error_files.map { |bitstream| bitstream["name"] }.join(", ")
raise "Error downloading file(s) #{error_names}"
end
@dspace_files = dspace_connector.list_bitsteams
generate_migration_snapshot
errors = upload_dspace_files(dspace_files)
if errors.count > 0
raise "Error uploading file(s):\n #{errors.join("\n")}" if errors.count > 0
end

dspace_files.each { |file| File.delete(file.filename) }
end

def upload_dspace_files(dspace_files)
results = aws_connector.upload_to_s3(dspace_files)
error_results = results.select { |result| result[:error].present? }
good_results = results.select { |result| result[:key].present? }
good_results.each do |result|
@migration_snapshot&.mark_complete(result[:file])
file_keys << result[:key]
end
@migration_snapshot&.save
error_results.map { |result| result[:error] }
dspace_files_json = "[#{dspace_files.map(&:to_json).join(',')}]"
DspaceBitstreamCopyJob.perform_later(dspace_files_json: dspace_files_json, work_id: work.id, migration_snapshot_id: migration_snapshot.id)
end

def aws_copy(files)
Expand Down
Binary file added dump.rdb
Binary file not shown.
6 changes: 3 additions & 3 deletions spec/services/pul_dspace_connector_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

describe "#download_bitstreams" do
it "finds no bitstreams" do
expect(dspace_data.download_bitstreams).to be_empty
expect(dspace_data.download_bitstreams(dspace_data.list_bitsteams)).to be_empty
end
end

Expand Down Expand Up @@ -56,7 +56,7 @@

it "downloads the bitstreams" do
allow(Honeybadger).to receive(:notify)
expect(dspace_data.download_bitstreams.count).to eq(3)
expect(dspace_data.download_bitstreams(dspace_data.list_bitsteams).count).to eq(3)
expect(Honeybadger).not_to have_received(:notify)
end

Expand All @@ -66,7 +66,7 @@

it "downloads the bitstreams" do
allow(Honeybadger).to receive(:notify)
expect(dspace_data.download_bitstreams.count).to eq(3)
expect(dspace_data.download_bitstreams(dspace_data.list_bitsteams).count).to eq(3)
expect(Honeybadger).to have_received(:notify).with(/Mismatching checksum .* for work: #{work.id} doi: #{work.doi} ark: #{work.ark}/)
end
end
Expand Down
Loading

0 comments on commit 3cbb67c

Please sign in to comment.