Skip to content

Commit

Permalink
Merge pull request #4 from Bencodes/worker-support
Browse files Browse the repository at this point in the history
  • Loading branch information
Bencodes authored Mar 27, 2023
2 parents 8c85746 + 2987de1 commit 0347b11
Show file tree
Hide file tree
Showing 17 changed files with 366 additions and 64 deletions.
6 changes: 0 additions & 6 deletions lint/internal/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,3 @@ py_binary(
srcs = ["test_runner_executable.py"],
visibility = ["//visibility:public"],
)

filegroup(
name = "test_runner_template",
srcs = ["test_runner_template.bash"],
visibility = ["//visibility:public"],
)
30 changes: 17 additions & 13 deletions lint/internal/defs.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ _ATTRS = {
cfg = "exec",
doc = "Test runner executible for validating the output results.",
),
"_test_runner_template": attr.label(
default = "//lint/internal:test_runner_template",
allow_single_file = True,
),
"srcs": attr.label_list(
mandatory = True,
allow_files = [".java", ".kt", ".kts"],
Expand Down Expand Up @@ -251,10 +247,13 @@ def _collect_android_lint_providers(ctx, regenerate):
inputs = inputs,
outputs = outputs,
executable = ctx.executable._lint_wrapper,
execution_requirements = {},
progress_message = "Running Android Lint {}".format(str(ctx.label)),
arguments = [args],
tools = [ctx.executable._lint_wrapper],
execution_requirements = {
"supports-workers": "1",
"requires-worker-protocol": "json",
},
)

return struct(
Expand All @@ -269,15 +268,20 @@ def _test_impl(ctx):
inputs.append(providers.lint_baseline)
inputs.extend(ctx.attr._test_runner_executable.default_runfiles.files.to_list())

ctx.actions.expand_template(
template = ctx.file._test_runner_template,
ctx.actions.write(
output = ctx.outputs.executable,
is_executable = True,
substitutions = {
"{executable}": ctx.executable._test_runner_executable.short_path,
"{lint_baseline}": providers.lint_baseline.short_path,
"{regenerate_baseline_files}": "false",
},
is_executable = False,
content = """
#!/bin/bash
{executable} \
--lint_baseline "{lint_baseline}" \
--regenerate_baseline_files "{regenerate_baseline_files}"
""".format(
executable = ctx.executable._test_runner_executable.short_path,
lint_baseline = providers.lint_baseline.short_path,
regenerate_baseline_files = "false",
),
)

return [
Expand Down
4 changes: 2 additions & 2 deletions lint/internal/dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ def rules_android_lint_dependencies():
maybe(
repo_rule = http_archive,
name = "rules_java",
url = "https://github.com/bazelbuild/rules_java/releases/download/5.4.0/rules_java-5.4.0.tar.gz",
sha256 = "9b87757af5c77e9db5f7c000579309afae75cf6517da745de01ba0c6e4870951",
url = "https://github.com/bazelbuild/rules_java/releases/download/5.4.1/rules_java-5.4.1.tar.gz",
sha256 = "a1f82b730b9c6395d3653032bd7e3a660f9d5ddb1099f427c1e1fe768f92e395",
)

maybe(
Expand Down
5 changes: 0 additions & 5 deletions lint/internal/test_runner_template.bash

This file was deleted.

11 changes: 8 additions & 3 deletions lint/internal/toolchains.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ def rules_android_lint_toolchains(lint_version = _LINT_VERSION):
maven_install(
name = "rules_android_lint_dependencies",
artifacts = [
# Worker dependencies
maven.artifact("com.squareup.moshi", "moshi", "1.14.0"),
maven.artifact("com.squareup.moshi", "moshi-kotlin", "1.14.0"),
maven.artifact("com.squareup.okio", "okio-jvm", "3.2.0"),
maven.artifact("io.reactivex.rxjava3", "rxjava", "3.0.10"),
maven.artifact("com.xenomachina", "kotlin-argparser", "2.0.7"),
# Testing dependencies
maven.artifact("junit", "junit", "4.13.2", testonly = True),
maven.artifact("org.assertj", "assertj-core", "3.24.2", testonly = True),
"io.reactivex.rxjava3:rxjava:3.0.10",
"com.xenomachina:kotlin-argparser:2.0.7",
# TODO These need to be passed in via the toolchain and dynamically
# These need to be passed in via the toolchain and dynamically
"com.android.tools.lint:lint:{}".format(lint_version),
"com.android.tools.lint:lint-api:{}".format(lint_version),
"com.android.tools.lint:lint-checks:{}".format(lint_version),
Expand Down
14 changes: 14 additions & 0 deletions lint/internal/worker/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library")

kt_jvm_library(
name = "worker",
srcs = glob(["*.kt"]),
visibility = ["//visibility:public"],
deps = [
"@rules_android_lint_dependencies//:com_squareup_moshi_moshi",
"@rules_android_lint_dependencies//:com_squareup_moshi_moshi_kotlin",
"@rules_android_lint_dependencies//:com_squareup_okio_okio_jvm",
"@rules_android_lint_dependencies//:io_reactivex_rxjava3_rxjava",
"@rules_android_lint_dependencies//:org_reactivestreams_reactive_streams",
],
)
10 changes: 10 additions & 0 deletions lint/internal/worker/InvocationWorker.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
internal class InvocationWorker(
private val args: Array<String>,
private val workerMessageProcessor: Worker.WorkRequestCallback,
) : Worker {

override fun processRequests(): Int {
// Handle a single work request
return workerMessageProcessor.processWorkRequest(args.toList(), System.err)
}
}
115 changes: 115 additions & 0 deletions lint/internal/worker/PersistentWorker.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Scheduler
import io.reactivex.rxjava3.schedulers.Schedulers
import java.io.BufferedOutputStream
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.PrintStream

internal class PersistentWorker(
/**
* WorkerIO instance wrapping the standard output streams
*/
private val workerIO: WorkerIO,

/**
* Rxjava Scheduler to execute work requests on.
*/
private val scheduler: Scheduler,

/**
* Instance of CpuTimeBasedGcScheduler that will run periodically
*/
private val persistentWorkerCpuTimeBasedGcScheduler: PersistentWorkerCpuTimeBasedGcScheduler,

/**
* Instance of CpuTimeBasedGcScheduler that will run periodically
*/
private val workRequestProcessor: Worker.WorkerMessageProcessor,

/**
* Instance of CpuTimeBasedGcScheduler that will run periodically
*/
private val workerWorkRequestCallback: Worker.WorkRequestCallback,
) : Worker {

constructor(
workerMessageProcessor: Worker.WorkRequestCallback,
) : this(
workerIO = WorkerIO(),
scheduler = Schedulers.io(),
persistentWorkerCpuTimeBasedGcScheduler = PersistentWorkerCpuTimeBasedGcScheduler(),
workRequestProcessor = WorkerJsonMessageProcessor(
System.`in`,
System.out,
),
workerWorkRequestCallback = workerMessageProcessor,
)

/**
* Initiate the worker and begin processing work requests
*/
override fun processRequests(): Int {
return workerIO.use { io ->
// Start by redirecting the system streams so that nothing
// corrupts the streams that the worker uses
io.redirectSystemStreams()

// Process requests as they come in using RxJava
Flowable.create(
{ emitter ->
while (!emitter.isCancelled) {
try {
val request: WorkRequest = workRequestProcessor.readWorkRequest()
emitter.onNext(request)
} catch (e: IOException) {
emitter.onError(e)
}
}
},
BackpressureStrategy.BUFFER,
).subscribeOn(scheduler).parallel().runOn(scheduler)
// Execute the work and map the result to a work response
.map { request -> return@map this.respondToRequest(request) }
// Run the garbage collector periodically so that we are a good responsible worker
.doOnNext { persistentWorkerCpuTimeBasedGcScheduler.maybePerformGc() }
.doOnError { it.printStackTrace() }.sequential().observeOn(scheduler)
.blockingSubscribe { response ->
workRequestProcessor.writeWorkResponse(response)
}
return@use 0
}
}

private fun respondToRequest(request: WorkRequest): WorkResponse {
ByteArrayOutputStream().use { baos ->
// Create a print stream that the execution can write logs to
val printStream = PrintStream(BufferedOutputStream(ByteArrayOutputStream()))
var exitCode: Int
try {
// Sanity check the work request arguments
val arguments =
requireNotNull(request.arguments) { "Request with id ${request.requestId} does not have arguments!" }
require(arguments.isNotEmpty()) { "Request with id ${request.requestId} does not have arguments!" }
exitCode = workerWorkRequestCallback.processWorkRequest(arguments, printStream)
} catch (e: Exception) {
e.printStackTrace(printStream)
exitCode = 1
} finally {
printStream.flush()
}

val output = arrayOf(baos.toString())
.asSequence()
.map { it.trim() }
.filter { it.isNotEmpty() }
.joinToString("\n")
return WorkResponse(
exitCode = exitCode,
output = output,
requestId = request.requestId,
)
}
}
}
40 changes: 40 additions & 0 deletions lint/internal/worker/PersistentWorkerCpuTimeBasedGcScheduler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import com.sun.management.OperatingSystemMXBean
import java.lang.management.ManagementFactory
import java.time.Duration
import java.util.concurrent.atomic.AtomicReference

internal class PersistentWorkerCpuTimeBasedGcScheduler(
/**
* After this much CPU time has elapsed, we may force a GC run. Set to [Duration.ZERO] to
* disable.
*/
private val cpuUsageBeforeGc: Duration = Duration.ofSeconds(10),
) {

private val cpuTime: Duration
get() = if (!cpuUsageBeforeGc.isZero) Duration.ofNanos(bean.processCpuTime) else Duration.ZERO

/** The total process CPU time at the last GC run (or from the start of the worker). */
private val cpuTimeAtLastGc: AtomicReference<Duration> = AtomicReference(cpuTime)

/** Call occasionally to perform a GC if enough CPU time has been used. */
fun maybePerformGc() {
if (!cpuUsageBeforeGc.isZero) {
val currentCpuTime = cpuTime
val lastCpuTime = cpuTimeAtLastGc.get()
// Do GC when enough CPU time has been used, but only if nobody else beat us to it.
if (currentCpuTime.minus(lastCpuTime) > cpuUsageBeforeGc
&& cpuTimeAtLastGc.compareAndSet(lastCpuTime, currentCpuTime)
) {
System.gc()
// Avoid counting GC CPU time against CPU time before next GC.
cpuTimeAtLastGc.compareAndSet(currentCpuTime, cpuTime)
}
}
}

companion object {
/** Used to get the CPU time used by this process. */
private val bean = ManagementFactory.getOperatingSystemMXBean() as OperatingSystemMXBean
}
}
15 changes: 15 additions & 0 deletions lint/internal/worker/WorkRequest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import com.squareup.moshi.Json

data class WorkRequest(
/**
* Request ID associated with the work request
*/
@Json(name = "requestId")
val requestId: Int = 0,

/**
* The work request arguments
*/
@Json(name = "arguments")
val arguments: List<String>,
)
21 changes: 21 additions & 0 deletions lint/internal/worker/WorkResponse.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import com.squareup.moshi.Json

data class WorkResponse(
/**
* The request ID for the work request
*/
@Json(name = "requestId")
val requestId: Int,

/**
* Exit status for the work request
*/
@Json(name = "exitCode")
val exitCode: Int,

/**
* Standard output that was collected during the work request
*/
@Json(name = "output")
val output: String,
)
43 changes: 43 additions & 0 deletions lint/internal/worker/Worker.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import java.io.IOException
import java.io.PrintStream

interface Worker {

fun processRequests(): Int

interface WorkRequestCallback {

/**
* Processes an individual work request.
*/
fun processWorkRequest(args: List<String>, printStream: PrintStream): Int
}

interface WorkerMessageProcessor {

@Throws(IOException::class)
fun readWorkRequest(): WorkRequest

@Throws(IOException::class)
fun writeWorkResponse(workResponse: WorkResponse)
}

companion object {

/**
* Creates the appropriate worker instance using the provided worker arguments.
*
* If `--persistent_worker` exists in the arguments, an instance of PersistentWorker will
* be returned. Otherwise an instance of InvocationWorker will be returned.
*/
fun fromArgs(
args: Array<String>,
workerMessageProcessor: WorkRequestCallback,
): Worker {
return when {
"--persistent_worker" in args -> PersistentWorker(workerMessageProcessor)
else -> InvocationWorker(args, workerMessageProcessor)
}
}
}
}
18 changes: 18 additions & 0 deletions lint/internal/worker/WorkerIO.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import java.io.InputStream
import java.io.PrintStream

internal class WorkerIO(
val input: InputStream = System.`in`,
val output: PrintStream = System.out,
val err: PrintStream = System.err,
) : AutoCloseable {

fun redirectSystemStreams(): WorkerIO {
System.setOut(err)
return this
}

override fun close() {
System.setOut(output)
}
}
Loading

0 comments on commit 0347b11

Please sign in to comment.