herlanS_ пре 3 година
родитељ
комит
a2ee1ccfb9

+ 38 - 0
src/main/kotlin/co/id/datacomsolusindo/ipphonebridge/ClientHolder.kt

@@ -3,6 +3,8 @@ package co.id.datacomsolusindo.ipphonebridge
 import kotlinx.coroutines.*
 import kotlinx.coroutines.sync.Mutex
 import kotlinx.coroutines.sync.withLock
+import org.springframework.messaging.simp.stomp.StompFrameHandler
+import org.springframework.messaging.simp.stomp.StompSession
 import java.time.LocalDateTime
 import java.time.format.DateTimeFormatter
 import java.util.concurrent.ConcurrentHashMap
@@ -12,6 +14,19 @@ import kotlin.system.measureTimeMillis
 object ClientHolder {
     private val mutexSucReq = Mutex()
     private val clientMap: ConcurrentMap<String, Client> by lazy { ConcurrentHashMap() }
+    private val subsList: ConcurrentMap<String, StompSession.Subscription> by lazy { ConcurrentHashMap() }
+
+    private var onPut: (key: String) -> Unit = {}
+    private var onRemove: (key: String) -> Unit = {}
+
+    fun setOnPut(x: (key: String) -> Unit) {
+        onPut = x
+    }
+
+    fun setOnRemove(x: (key: String) -> Unit) {
+        onRemove = x
+    }
+
     private suspend fun massiveRun(action: suspend () -> Unit) {
         measureTimeMillis {
             coroutineScope { // sc
@@ -22,15 +37,38 @@ object ClientHolder {
         }
     }
 
+    fun subsAll(ss: StompSession, handler: StompFrameHandler) {
+        clientMap.forEach() {
+            subsList.getOrPut(it.key) { ss.subscribe("/topic/partial/${it.key}", handler) }
+        }
+    }
+
+    fun unSubsAll() {
+        subsList.forEach() {
+            it.value.unsubscribe()
+        }
+    }
+
+    fun addSubs(key: String, subs: StompSession.Subscription) {
+        subsList.getOrPut(key) { subs }
+    }
+
+    fun removeSubs(key: String) {
+        subsList.filter { it.key == key }.forEach { it.value.unsubscribe() }
+    }
+
+
     fun removeBySessionId(sessionId: String?) {
         clientMap.entries.filter { it.value.sessionId == sessionId }.forEach {
             clientMap.remove(it.key)
+            onRemove(it.key)
             //unsubscribe "/topic/partial/client"
         }
     }
 
     fun put(key: String, client: Client) {
         clientMap.getOrPut(key) { client }
+        onPut(key)
         //subscribe  "/topic/partial/client"
     }
 

+ 10 - 11
src/main/kotlin/co/id/datacomsolusindo/ipphonebridge/GatewayClient.kt

@@ -29,11 +29,8 @@ import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.ConcurrentMap
 
 class GatewayClient(
-    val socketHost: String,
-    val clientName: String,
-    val currentLocalPort: Int,
-    val socketPort: Int,
-    val srvTemplate: SimpMessagingTemplate
+    val socketUrl: String,
+    private val srvTemplate: SimpMessagingTemplate
 ) {
     enum class State { INITIALIZED, CONNECTING, FAILED, CONNECT }
 
@@ -49,8 +46,8 @@ class GatewayClient(
             val sessionHandler =
                 BridgeStompSessionHandler(srvTemplate)
 
-            AppLog.write(this.javaClass).info("Try connect to : ws://$socketHost:$socketPort/_websocket")
-            val cn = stompClient.connect("ws://$socketHost:$socketPort/_websocket", sessionHandler)
+            AppLog.write(this.javaClass).info("Try connect to : $socketUrl")
+            val cn = stompClient.connect(socketUrl, sessionHandler)
 
             cn.addCallback({ s ->
                 session = s
@@ -73,7 +70,7 @@ class GatewayClient(
     }
 }
 
-class BridgeStompSessionHandler(val srvTemplate: SimpMessagingTemplate) : StompSessionHandler {
+class BridgeStompSessionHandler(private val srvTemplate: SimpMessagingTemplate) : StompSessionHandler {
     var session: StompSession? = null
 
     //    val partialDataMap: ConcurrentMap<String, MutableList<PartialData>> = ConcurrentHashMap()
@@ -101,8 +98,10 @@ class BridgeStompSessionHandler(val srvTemplate: SimpMessagingTemplate) : StompS
 
     override fun afterConnected(ss: StompSession, sh: StompHeaders) {
         AppLog.write(this.javaClass).info("Connected to gateway socket server")
-//        ss.subscribe("/topic/partial/$clientName", this)
-//        ss.subscribe("/topic/healthCheck", this)
+        ClientHolder.setOnPut { ClientHolder.addSubs(it, ss.subscribe("/topic/partial/$it", this)) }
+        ClientHolder.setOnRemove { ClientHolder.removeSubs(it) }
+        ClientHolder.subsAll(ss, this)
+        ss.subscribe("/topic/healthCheck", this)
         this.session = ss
     }
 
@@ -122,7 +121,7 @@ class BridgeStompSessionHandler(val srvTemplate: SimpMessagingTemplate) : StompS
                     )
                 }
                 "/topic/response/$reqId" -> {
-                    SocketChecker().collectChunk(payload as SocketChunkData,reqId)
+                    SocketChecker().collectChunk(payload as SocketChunkData, reqId)
                 }
             }
         }