Replace Queue Worker with ThreadPoolExecutor based TasksQueue

This commit is contained in:
2020-07-27 08:22:28 +02:00
parent 39a15e6b28
commit cb6f234b02
10 changed files with 81 additions and 41 deletions

View File

@@ -8,6 +8,7 @@ import com.bartlomiejpluta.ttsserver.core.lua.sandbox.SandboxFactory
import com.bartlomiejpluta.ttsserver.core.web.endpoint.DefaultEndpoint
import com.bartlomiejpluta.ttsserver.core.web.endpoint.Endpoint
import com.bartlomiejpluta.ttsserver.core.web.endpoint.QueuedEndpoint
import com.bartlomiejpluta.ttsserver.core.web.queue.TasksQueueFactory
import com.bartlomiejpluta.ttsserver.core.web.uri.UriTemplate
import com.bartlomiejpluta.ttsserver.ui.main.MainActivity
import fi.iki.elonen.NanoHTTPD.Method
@@ -17,7 +18,8 @@ import java.io.File
class EndpointLoader(
private val context: Context,
private val sandboxFactory: SandboxFactory
private val sandboxFactory: SandboxFactory,
private val tasksQueueFactory: TasksQueueFactory
) {
fun loadEndpoints(): List<Endpoint> {
@@ -57,18 +59,19 @@ class EndpointLoader(
}
private fun createDefaultEndpoint(luaTable: LuaTable): Endpoint = DefaultEndpoint(
consumer = parseConsumer(luaTable),
uri = parseUri(luaTable),
method = parseMethod(luaTable),
accepts = parseAccepts(luaTable),
consumer = parseConsumer(luaTable)
accepts = parseAccepts(luaTable)
)
private fun createQueuedEndpoint(luaTable: LuaTable): Endpoint = QueuedEndpoint(
context = context,
queue = tasksQueueFactory.create(),
consumer = parseConsumer(luaTable),
uri = parseUri(luaTable),
method = parseMethod(luaTable),
accepts = parseAccepts(luaTable),
consumer = parseConsumer(luaTable)
accepts = parseAccepts(luaTable)
)
private fun parseUri(luaTable: LuaTable) = luaTable.get("path").checkjstring()

View File

@@ -10,10 +10,10 @@ import java.io.File
import java.io.FileInputStream
class DefaultEndpoint(
private val consumer: LuaClosure,
uri: UriTemplate,
accepts: String?,
method: Method,
private val consumer: LuaClosure
accepts: String?
) : AbstractEndpoint(uri, accepts, method) {
override fun safeHit(request: Request) = request.luaTable

View File

@@ -2,31 +2,29 @@ package com.bartlomiejpluta.ttsserver.core.web.endpoint
import android.content.Context
import com.bartlomiejpluta.ttsserver.core.web.dto.Request
import com.bartlomiejpluta.ttsserver.core.web.queue.TasksQueue
import com.bartlomiejpluta.ttsserver.core.web.task.QueueableTask
import com.bartlomiejpluta.ttsserver.core.web.uri.UriTemplate
import com.bartlomiejpluta.ttsserver.core.web.worker.Worker
import fi.iki.elonen.NanoHTTPD
import fi.iki.elonen.NanoHTTPD.newFixedLengthResponse
import org.luaj.vm2.LuaClosure
import java.util.concurrent.LinkedBlockingQueue
class QueuedEndpoint(
context: Context,
private val context: Context,
private val queue: TasksQueue,
private val consumer: LuaClosure,
uri: UriTemplate,
accepts: String?,
method: NanoHTTPD.Method,
consumer: LuaClosure
accepts: String?
) : AbstractEndpoint(uri, accepts, method) {
private val queue = LinkedBlockingQueue<Request>()
private val worker = Thread(Worker(context, queue, consumer)).also { it.name = uri.template }
override fun safeHit(request: Request): NanoHTTPD.Response? {
queue.add(request)
val task = QueueableTask(context, consumer, request, queue)
queue.push(task)
return newFixedLengthResponse(NanoHTTPD.Response.Status.ACCEPTED, "text/plain", "")
}
fun runWorker() = worker.start()
fun stopWorker() = worker.interrupt()
fun shutdownQueue() = queue.shutdown()
override fun toString() = "Q[${uri.template}]"
}

View File

@@ -0,0 +1,9 @@
package com.bartlomiejpluta.ttsserver.core.web.queue
import com.bartlomiejpluta.ttsserver.core.web.task.QueueableTask
interface TasksQueue {
fun push(queueableTask: QueueableTask)
fun shutdown()
val size: Int
}

View File

@@ -0,0 +1,5 @@
package com.bartlomiejpluta.ttsserver.core.web.queue
class TasksQueueFactory {
fun create() = ThreadPoolBasedTasksQueue()
}

View File

@@ -0,0 +1,22 @@
package com.bartlomiejpluta.ttsserver.core.web.queue
import com.bartlomiejpluta.ttsserver.core.web.task.QueueableTask
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
class ThreadPoolBasedTasksQueue : TasksQueue {
private val executor = ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue())
override fun push(queueableTask: QueueableTask) {
executor.execute(queueableTask)
}
override fun shutdown() {
executor.shutdown()
executor.awaitTermination(5L, TimeUnit.SECONDS)
}
override val size: Int
get() = executor.queue.size
}

View File

@@ -75,7 +75,6 @@ class WebServer(
override fun start() {
super.start()
queuedEndpoints.forEach { it.runWorker() }
LocalBroadcastManager
.getInstance(context)
@@ -86,7 +85,7 @@ class WebServer(
override fun stop() {
super.stop()
queuedEndpoints.forEach { it.stopWorker() }
queuedEndpoints.forEach { it.shutdownQueue() }
LocalBroadcastManager
.getInstance(context)

View File

@@ -1,38 +1,33 @@
package com.bartlomiejpluta.ttsserver.core.web.worker
package com.bartlomiejpluta.ttsserver.core.web.task
import android.content.Context
import android.content.Intent
import androidx.localbroadcastmanager.content.LocalBroadcastManager
import com.bartlomiejpluta.R
import com.bartlomiejpluta.ttsserver.core.web.dto.Request
import com.bartlomiejpluta.ttsserver.service.foreground.ForegroundService
import com.bartlomiejpluta.ttsserver.service.state.ServiceState
import com.bartlomiejpluta.ttsserver.core.web.queue.TasksQueue
import com.bartlomiejpluta.ttsserver.ui.main.MainActivity
import org.luaj.vm2.LuaClosure
import org.luaj.vm2.LuaError
import org.luaj.vm2.LuaInteger
import org.luaj.vm2.LuaValue
import org.luaj.vm2.lib.ZeroArgFunction
import java.util.concurrent.BlockingQueue
class Worker(
class QueueableTask(
private val context: Context,
private val queue: BlockingQueue<Request>,
private val consumer: LuaClosure
private val consumer: LuaClosure,
private val request: Request,
private val queue: TasksQueue
) : Runnable {
override fun run() = try {
while (ForegroundService.state == ServiceState.RUNNING) {
consume(queue.take())
}
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
}
private fun consume(request: Request) = try {
override fun run() {
try {
consumer.call(request.luaTable, QueueSizeFunction(queue))
} catch (e: LuaError) {
handleLuaError(e)
}
}
private fun handleLuaError(exception: LuaError) {
LocalBroadcastManager
@@ -43,7 +38,7 @@ class Worker(
})
}
class QueueSizeFunction(private val queue: BlockingQueue<Request>) : ZeroArgFunction() {
class QueueSizeFunction(private val queue: TasksQueue) : ZeroArgFunction() {
override fun call(): LuaInteger = LuaValue.valueOf(queue.size)
}
}

View File

@@ -8,6 +8,7 @@ import com.bartlomiejpluta.ttsserver.core.lua.loader.EndpointLoader
import com.bartlomiejpluta.ttsserver.core.lua.sandbox.SandboxFactory
import com.bartlomiejpluta.ttsserver.core.tts.engine.TTSEngine
import com.bartlomiejpluta.ttsserver.core.util.NetworkUtil
import com.bartlomiejpluta.ttsserver.core.web.queue.TasksQueueFactory
import com.bartlomiejpluta.ttsserver.initializer.ScriptsInitializer
import dagger.Module
import dagger.Provides
@@ -18,8 +19,12 @@ class LuaModule {
@Provides
@Singleton
fun endpointLoader(context: Context, sandboxFactory: SandboxFactory) =
EndpointLoader(context, sandboxFactory)
fun endpointLoader(
context: Context,
sandboxFactory: SandboxFactory,
tasksQueueFactory: TasksQueueFactory
) =
EndpointLoader(context, sandboxFactory, tasksQueueFactory)
@Provides
@Singleton
@@ -45,6 +50,10 @@ class LuaModule {
sonosLibrary
)
@Provides
@Singleton
fun tasksQueueFactory() = TasksQueueFactory()
@Provides
@Singleton
fun threadLibrary() = ThreadLibrary()

View File

@@ -62,7 +62,7 @@ class TTSModule {
@Provides
@Singleton
fun adudioConverter(context: Context) = AudioConverter(context)
fun audioConverter(context: Context) = AudioConverter(context)
@Provides
@Singleton