From 8f6eed8bab43b36c072500ceaf7313f5b39c9fd5 Mon Sep 17 00:00:00 2001 From: "A.Klivtsov" Date: Sat, 16 May 2026 12:36:38 +0300 Subject: [PATCH] gateway added --- .idea/gradle.xml | 1 + gateway/build.gradle | 36 ++++ gateway/src/main/kotlin/Main.kt | 25 +++ .../src/main/kotlin/core/GatewayManager.kt | 171 ++++++++++++++++++ settings.gradle | 3 +- 5 files changed, 235 insertions(+), 1 deletion(-) create mode 100644 gateway/build.gradle create mode 100644 gateway/src/main/kotlin/Main.kt create mode 100644 gateway/src/main/kotlin/core/GatewayManager.kt diff --git a/.idea/gradle.xml b/.idea/gradle.xml index b0bb074..c6945e9 100644 --- a/.idea/gradle.xml +++ b/.idea/gradle.xml @@ -10,6 +10,7 @@ diff --git a/gateway/build.gradle b/gateway/build.gradle new file mode 100644 index 0000000..d62a3f0 --- /dev/null +++ b/gateway/build.gradle @@ -0,0 +1,36 @@ +plugins { + id 'org.jetbrains.kotlin.jvm' version '2.2.0' + id 'org.jetbrains.kotlin.plugin.serialization' version '2.2.0' + id 'com.gradleup.shadow' version '9.3.1' + id 'org.jetbrains.dokka' version '2.2.0' +} + +group = 'lab7.prog' +version = '1.0' + +repositories { + mavenCentral() +} + +dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0") + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.7.0") + implementation project(':common') +} + +test { + useJUnitPlatform() +} + + +shadowJar { + archiveBaseName.set('Lab7-gateway') + archiveVersion.set('1.0') + manifest { + attributes 'Main-Class': 'MainKt' + } +} + +kotlin { + jvmToolchain(17) +} \ No newline at end of file diff --git a/gateway/src/main/kotlin/Main.kt b/gateway/src/main/kotlin/Main.kt new file mode 100644 index 0000000..555a9cd --- /dev/null +++ b/gateway/src/main/kotlin/Main.kt @@ -0,0 +1,25 @@ +import core.BackendServer +import core.GatewayManager + +fun main(args: Array) { + val listenPort = if (args.isNotEmpty()) args[0].toInt() else 9090 + + val backends = listOf( + BackendServer("localhost", 8080), + BackendServer("localhost", 8081), + ) + + val gateway = GatewayManager( + listenPort = listenPort, + backends = backends, + healthCheckIntervalMs = 5_000L, + probeTimeoutMs = 2_000 + ) + + Runtime.getRuntime().addShutdownHook(Thread { + gateway.stop() + }) + + gateway.startHealthChecks() + gateway.start() +} diff --git a/gateway/src/main/kotlin/core/GatewayManager.kt b/gateway/src/main/kotlin/core/GatewayManager.kt new file mode 100644 index 0000000..fbd5f76 --- /dev/null +++ b/gateway/src/main/kotlin/core/GatewayManager.kt @@ -0,0 +1,171 @@ +package core + +import network.AppJson +import network.Request +import java.net.DatagramPacket +import java.net.DatagramSocket +import java.net.InetAddress +import java.util.concurrent.Executors +import java.util.concurrent.ForkJoinPool +import java.util.concurrent.RecursiveAction +import java.util.concurrent.RecursiveTask +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +data class BackendServer(val host: String, val port: Int) { + @Volatile var isAlive: Boolean = false + override fun toString() = "$host:$port" +} + +class GatewayManager( + private val listenPort: Int, + private val backends: List, + private val healthCheckIntervalMs: Long = 5_000L, + private val probeTimeoutMs: Int = 2_000 +) { + private val socket = DatagramSocket(listenPort) + private val roundRobinCounter = AtomicInteger(0) + + private val readerPool = Executors.newFixedThreadPool(4) + private val processingPool = ForkJoinPool(Runtime.getRuntime().availableProcessors()) + private val senderPool = ForkJoinPool(Runtime.getRuntime().availableProcessors()) + private val healthPool = Executors.newScheduledThreadPool(2) + + fun startHealthChecks() { + println("[Gateway] Initial health check...") + backends.forEach { probeServer(it) } + println("[Gateway] Alive: ${backends.filter { it.isAlive }.map { "$it" }}") + + healthPool.scheduleAtFixedRate( + { backends.forEach { probeServer(it) } }, + healthCheckIntervalMs, + healthCheckIntervalMs, + TimeUnit.MILLISECONDS + ) + } + + private fun probeServer(backend: BackendServer) { + val wasAlive = backend.isAlive + backend.isAlive = try { + val probe = AppJson.encodeToString( + Request.serializer(), + Request(commandName = "info", jwt = "no token") + ).toByteArray(Charsets.UTF_8) + + val bs = DatagramSocket() + bs.soTimeout = probeTimeoutMs + bs.send(DatagramPacket(probe, probe.size, InetAddress.getByName(backend.host), backend.port)) + + val buf = ByteArray(65507) + bs.receive(DatagramPacket(buf, buf.size)) + bs.close() + true + } catch (e: Exception) { + false + } + + if (wasAlive != backend.isAlive) + println("[Gateway] $backend -> ${if (backend.isAlive) "UP" else "DOWN"}") + } + + private fun pickBackend(): BackendServer? { + val alive = backends.filter { it.isAlive } + if (alive.isEmpty()) return null + return alive[roundRobinCounter.getAndIncrement() % alive.size] + } + + fun start() { + println("[Gateway] Listening on port $listenPort") + repeat(4) { readerPool.submit(::readLoop) } + try { + Thread.currentThread().join() + } catch (_: InterruptedException) {} + } + + private fun readLoop() { + val buf = ByteArray(65507) + while (!socket.isClosed) { + try { + val packet = DatagramPacket(buf.copyOf(), buf.size) + socket.receive(packet) + + val address = packet.address + val clientPort = packet.port + val data = packet.data.copyOf(packet.length) + + processingPool.submit(ProxyTask(data, address, clientPort)) + } catch (e: Exception) { + if (!socket.isClosed) println("[Gateway] Read error: ${e.message}") + else break + } + } + } + + inner class ProxyTask( + private val data: ByteArray, + private val clientAddress: InetAddress, + private val clientPort: Int + ) : RecursiveTask() { + + override fun compute() { + val backend = pickBackend() + if (backend == null) { + val err = """{"success":false,"message":"No alive servers"}""" + .toByteArray(Charsets.UTF_8) + senderPool.submit(SendTask(err, clientAddress, clientPort)) + return + } + + try { + val bs = DatagramSocket() + bs.soTimeout = probeTimeoutMs + bs.send(DatagramPacket(data, data.size, InetAddress.getByName(backend.host), backend.port)) + + val respBuf = ByteArray(65507) + val respPacket = DatagramPacket(respBuf, respBuf.size) + bs.receive(respPacket) + bs.close() + + senderPool.submit(SendTask(respPacket.data.copyOf(respPacket.length), clientAddress, clientPort)) + + } catch (e: Exception) { + println("[Gateway] Backend $backend failed: ${e.message}") + backend.isAlive = false + + val fallback = pickBackend() + if (fallback != null) { + processingPool.submit(ProxyTask(data, clientAddress, clientPort)) + } else { + val err = """{"success":false,"message":"All backends unavailable"}""" + .toByteArray(Charsets.UTF_8) + senderPool.submit(SendTask(err, clientAddress, clientPort)) + } + } + } + } + + inner class SendTask( + private val bytes: ByteArray, + private val address: InetAddress, + private val port: Int + ) : RecursiveAction() { + + override fun compute() { + try { + val packet = DatagramPacket(bytes, bytes.size, address, port) + synchronized(socket) { socket.send(packet) } + } catch (e: Exception) { + println("[Gateway] Send error: ${e.message}") + } + } + } + + fun stop() { + println("[Gateway] Shutting down...") + socket.close() + readerPool.shutdownNow() + processingPool.shutdown() + senderPool.shutdown() + healthPool.shutdownNow() + } +} diff --git a/settings.gradle b/settings.gradle index 08d52da..e834d66 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,4 +4,5 @@ plugins { rootProject.name = 'Lab7' include 'client' include 'server' -include 'common' \ No newline at end of file +include 'common' +include 'gateway' \ No newline at end of file