|
@@ -0,0 +1,296 @@
|
|
|
+package co.id.datacomsolusindo.ipphonebridge
|
|
|
+
|
|
|
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
|
|
+import org.apache.commons.io.IOUtils
|
|
|
+import org.springframework.core.io.ByteArrayResource
|
|
|
+import org.springframework.core.io.Resource
|
|
|
+import org.springframework.http.HttpEntity
|
|
|
+import org.springframework.http.HttpHeaders
|
|
|
+import org.springframework.http.HttpMethod
|
|
|
+import org.springframework.http.HttpStatus
|
|
|
+import org.springframework.messaging.converter.MappingJackson2MessageConverter
|
|
|
+import org.springframework.messaging.simp.SimpMessagingTemplate
|
|
|
+import org.springframework.messaging.simp.stomp.StompCommand
|
|
|
+import org.springframework.messaging.simp.stomp.StompHeaders
|
|
|
+import org.springframework.messaging.simp.stomp.StompSession
|
|
|
+import org.springframework.messaging.simp.stomp.StompSessionHandler
|
|
|
+import org.springframework.util.LinkedMultiValueMap
|
|
|
+import org.springframework.util.MultiValueMap
|
|
|
+import org.springframework.web.client.HttpStatusCodeException
|
|
|
+import org.springframework.web.client.RestTemplate
|
|
|
+import org.springframework.web.socket.client.standard.StandardWebSocketClient
|
|
|
+import org.springframework.web.socket.messaging.WebSocketStompClient
|
|
|
+import java.io.Serializable
|
|
|
+import java.lang.reflect.Type
|
|
|
+import java.net.URI
|
|
|
+import java.nio.charset.Charset
|
|
|
+import java.util.*
|
|
|
+import java.util.concurrent.ConcurrentHashMap
|
|
|
+import java.util.concurrent.ConcurrentMap
|
|
|
+
|
|
|
+class GatewayClient(
|
|
|
+ val socketHost: String,
|
|
|
+ val clientName: String,
|
|
|
+ val currentLocalPort: Int,
|
|
|
+ val socketPort: Int,
|
|
|
+ val srvTemplate: SimpMessagingTemplate
|
|
|
+) {
|
|
|
+ enum class State { INITIALIZED, CONNECTING, FAILED, CONNECT }
|
|
|
+
|
|
|
+ var state = State.INITIALIZED
|
|
|
+ var session: StompSession? = null
|
|
|
+
|
|
|
+ fun connect() {
|
|
|
+ try {
|
|
|
+ state = State.CONNECTING
|
|
|
+ val client = StandardWebSocketClient()
|
|
|
+ val stompClient = WebSocketStompClient(client)
|
|
|
+ stompClient.messageConverter = MappingJackson2MessageConverter()
|
|
|
+ val sessionHandler =
|
|
|
+ BridgeStompSessionHandler(srvTemplate)
|
|
|
+
|
|
|
+ AppLog.write(this.javaClass).info("Try connect to : ws://$socketHost:$socketPort/_websocket")
|
|
|
+ val cn = stompClient.connect("ws://$socketHost:$socketPort/_websocket", sessionHandler)
|
|
|
+
|
|
|
+ cn.addCallback({ s ->
|
|
|
+ session = s
|
|
|
+ if (s != null && s.isConnected) {
|
|
|
+ AppLog.write(this.javaClass).info("Socket is connected.")
|
|
|
+ state = State.CONNECT
|
|
|
+ }
|
|
|
+ }, { e ->
|
|
|
+ AppLog.write(this.javaClass).info("Can't connect to socket server.")
|
|
|
+ AppLog.write(this.javaClass).error(e.message, e)
|
|
|
+ state = State.FAILED
|
|
|
+ })
|
|
|
+ session = sessionHandler.session
|
|
|
+ } catch (e: Exception) {
|
|
|
+ AppLog.write(this.javaClass)
|
|
|
+ .info("Can't connect to socket server, check your configuration and log. retry in 5 second")
|
|
|
+ AppLog.write(this.javaClass).error(e.message, e)
|
|
|
+ state = State.FAILED
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+class BridgeStompSessionHandler(val srvTemplate: SimpMessagingTemplate) : StompSessionHandler {
|
|
|
+ var session: StompSession? = null
|
|
|
+
|
|
|
+ // val partialDataMap: ConcurrentMap<String, MutableList<PartialData>> = ConcurrentHashMap()
|
|
|
+// val partialDataMap: ConcurrentMap<String, ConcurrentMap<Int, PartialData>> = ConcurrentHashMap()
|
|
|
+
|
|
|
+ override fun handleException(
|
|
|
+ session: StompSession,
|
|
|
+ command: StompCommand?,
|
|
|
+ headers: StompHeaders,
|
|
|
+ btar: ByteArray,
|
|
|
+ error: Throwable
|
|
|
+ ) {
|
|
|
+ AppLog.write(this.javaClass).info("Socket Handle Error")
|
|
|
+ AppLog.write(this.javaClass).error("Socket Handle Error", error)
|
|
|
+ }
|
|
|
+
|
|
|
+ override fun handleTransportError(session: StompSession, error: Throwable) {
|
|
|
+ AppLog.write(this.javaClass).info("Socket Transport Error")
|
|
|
+ AppLog.write(this.javaClass).error("Socket Transport Error", error)
|
|
|
+ }
|
|
|
+
|
|
|
+ override fun getPayloadType(headers: StompHeaders): Type {
|
|
|
+ return Any::class.java
|
|
|
+ }
|
|
|
+
|
|
|
+ override fun afterConnected(ss: StompSession, sh: StompHeaders) {
|
|
|
+ AppLog.write(this.javaClass).info("Connected to gateway socket server")
|
|
|
+// ss.subscribe("/topic/partial/$clientName", this)
|
|
|
+// ss.subscribe("/topic/healthCheck", this)
|
|
|
+ this.session = ss
|
|
|
+ }
|
|
|
+
|
|
|
+ override fun handleFrame(stompHeaders: StompHeaders, payload: Any?) {
|
|
|
+// "/topic/incoming/clnum"
|
|
|
+ if (payload != null) {
|
|
|
+ val reqId = Singletons.requestInstance.entries.map { it.key }
|
|
|
+ .firstOrNull { stompHeaders.destination?.endsWith(it) ?: false } ?: ""
|
|
|
+ val clientDest =
|
|
|
+ ClientHolder.get().map { it.key }.firstOrNull { stompHeaders.destination?.endsWith(it) ?: false } ?: ""
|
|
|
+//
|
|
|
+ when (stompHeaders.destination) {
|
|
|
+ "/topic/partial/$clientDest" -> {
|
|
|
+ srvTemplate.convertAndSend(
|
|
|
+ "/topic/partial/$clientDest",
|
|
|
+ payload
|
|
|
+ )
|
|
|
+ }
|
|
|
+ "/topic/response/$reqId" -> {
|
|
|
+ SocketChecker().collectChunk(payload as SocketChunkData,reqId)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+// when (stompHeaders.destination) {
|
|
|
+// "/topic/healthCheck" -> {
|
|
|
+// if ((1..100).random() == 5) {
|
|
|
+// val msgObj = payload as LinkedHashMap<*, *>
|
|
|
+// AppLog.write(javaClass).info("Socket Health Check " + msgObj["Hi"])
|
|
|
+// }
|
|
|
+// }
|
|
|
+// "/topic/request/$clientName" -> {
|
|
|
+// //process
|
|
|
+// this.session!!.send("/app/confirm/req", "OK")
|
|
|
+// }
|
|
|
+//
|
|
|
+// "/topic/partial/$clientName" -> {
|
|
|
+// val msgObj = payload as LinkedHashMap<*, *>
|
|
|
+// val partData = PartialData(
|
|
|
+// msgObj["id"] as String,
|
|
|
+// msgObj["total"] as Int,
|
|
|
+// msgObj["idx"] as Int,
|
|
|
+// msgObj["data"] as String,
|
|
|
+// msgObj["startTime"] as Long
|
|
|
+// )
|
|
|
+//
|
|
|
+// val pt = partialDataMap.getOrPut(payload["id"] as String) {
|
|
|
+// ConcurrentHashMap()
|
|
|
+// }
|
|
|
+// pt.getOrPut(partData.idx){partData}
|
|
|
+// val listChunk = pt.entries.sortedBy { it.key }.map { it.value }
|
|
|
+// if (listChunk.isNotEmpty() && listChunk.size == listChunk[0].total) {
|
|
|
+// val rbJs = listChunk.joinToString("") { it.data }
|
|
|
+//
|
|
|
+// val mapper = jacksonObjectMapper()
|
|
|
+// val rb = mapper.readValue(rbJs, RequestBuilder::class.java)
|
|
|
+// requestBuilderProcessor(rb, listChunk[0].startTime)
|
|
|
+// }
|
|
|
+//
|
|
|
+// }
|
|
|
+// else -> {
|
|
|
+// AppLog.write(this.javaClass)
|
|
|
+// .info("unknown message from ${stompHeaders.destination} with message $payload")
|
|
|
+// }
|
|
|
+//
|
|
|
+// }
|
|
|
+ }
|
|
|
+
|
|
|
+// fun requestBuilderProcessor(rb: RequestBuilder, startTime: Long) {
|
|
|
+// var intStartTime = startTime
|
|
|
+// val headers: MultiValueMap<String, String> = LinkedMultiValueMap<String, String>()
|
|
|
+// rb.headers.entries.forEach { headers[it.key] = mutableListOf(it.value) }
|
|
|
+// var url = "http://localhost:" + currentLocalPort + rb.path
|
|
|
+// AppLog.write(this.javaClass).info("-----------request builder -----------")
|
|
|
+// AppLog.write(this.javaClass).info(rb.body)
|
|
|
+// AppLog.write(this.javaClass).info(rb.headers)
|
|
|
+// AppLog.write(this.javaClass).info(rb.parts)
|
|
|
+// AppLog.write(this.javaClass).info(rb.queryString)
|
|
|
+// AppLog.write(this.javaClass).info("-----------end builder -----------")
|
|
|
+//
|
|
|
+// val queryString = rb.queryString
|
|
|
+//
|
|
|
+// if (queryString != null) {
|
|
|
+// url = "$url?$queryString"
|
|
|
+// }
|
|
|
+//
|
|
|
+// val urlSend = URI(url)
|
|
|
+// AppLog.write(this.javaClass).info("socket -> url to access $url")
|
|
|
+//
|
|
|
+// val httpEnt = rb.parts?.let {
|
|
|
+// val bodyPart: MultiValueMap<String, Any> = LinkedMultiValueMap()
|
|
|
+// it.forEach { fp ->
|
|
|
+// bodyPart.add(
|
|
|
+// fp.name,
|
|
|
+// FileNameAwareByteArrayResource(fp.fileName, Base64.getDecoder().decode(fp.data), null)
|
|
|
+// )
|
|
|
+// }
|
|
|
+// HttpEntity(bodyPart, headers)
|
|
|
+// } ?: HttpEntity(rb.body?.replace("\r\n", "\n")?.replace("\r", "\n")?.replace("\n", "\r\n"), headers)
|
|
|
+//
|
|
|
+// val d1 = (System.nanoTime() - intStartTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds.
|
|
|
+//
|
|
|
+// AppLog.write(this.javaClass).info("Socket prc preparation. Time: $d1, ID: ${rb.id}")
|
|
|
+// intStartTime = System.nanoTime()
|
|
|
+//
|
|
|
+// var hd: HttpHeaders? = null
|
|
|
+// var stat: Int
|
|
|
+// var bdz: ByteArray?
|
|
|
+//
|
|
|
+// try {
|
|
|
+// val response = restTemplate.exchange(urlSend, rb.method, httpEnt, Resource::class.java)
|
|
|
+// bdz = if (response.body != null) {
|
|
|
+// IOUtils.toByteArray(response.body!!.inputStream)
|
|
|
+// } else {
|
|
|
+// ByteArray(0)
|
|
|
+// }
|
|
|
+// hd = response.headers
|
|
|
+// stat = response.statusCode.value()
|
|
|
+// } catch (httpError: HttpStatusCodeException) {
|
|
|
+// AppLog.write(this.javaClass).info("http error + ${httpError.message}")
|
|
|
+// AppLog.write(this.javaClass).error("http error", httpError)
|
|
|
+// bdz = httpError.responseBodyAsByteArray
|
|
|
+// hd = httpError.responseHeaders
|
|
|
+// stat = httpError.statusCode.value()
|
|
|
+//
|
|
|
+// }
|
|
|
+// catch (ex: Exception) {
|
|
|
+// AppLog.write(this.javaClass).info("request error")
|
|
|
+// AppLog.write(this.javaClass).error("request error", ex)
|
|
|
+// stat = HttpStatus.BAD_REQUEST.ordinal
|
|
|
+// bdz = "unknown error".toByteArray(Charset.defaultCharset())
|
|
|
+// }
|
|
|
+//
|
|
|
+// val d2 = (System.nanoTime() - intStartTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds.
|
|
|
+// AppLog.write(this.javaClass).info("Socket loop internal prc. Time: $d2, ID: ${rb.id}")
|
|
|
+// intStartTime = System.nanoTime()
|
|
|
+//
|
|
|
+// this.session?.let {
|
|
|
+// val chunkSize = 2000
|
|
|
+// if (bdz != null && bdz.size > chunkSize) {
|
|
|
+// val chunk = bdz.toList().chunked(chunkSize).map { bt -> bt.toByteArray() }
|
|
|
+// chunk.forEachIndexed { idx, bt ->
|
|
|
+// if (idx < chunk.size - 1) {
|
|
|
+// it.send("/app/response/${rb.id}", SocketChunkData(bt, null, stat, idx, chunk.size))
|
|
|
+// } else {
|
|
|
+// it.send("/app/response/${rb.id}", SocketChunkData(bt, hd, stat, idx, chunk.size))//send
|
|
|
+// }
|
|
|
+// }
|
|
|
+// } else {
|
|
|
+// //send
|
|
|
+// it.send("/app/response/${rb.id}", SocketChunkData(bdz, hd, stat, 1, 1))
|
|
|
+// }
|
|
|
+// }
|
|
|
+// val d3 = (System.nanoTime() - intStartTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds.
|
|
|
+// AppLog.write(this.javaClass).info("Socket send response. Time: $d3, ID: ${rb.id}")
|
|
|
+// val duration = (System.nanoTime() - startTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds.
|
|
|
+// AppLog.write(this.javaClass).info("Bridge total process Success. Time: $duration, ID: ${rb.id}, URL: $urlSend")
|
|
|
+//
|
|
|
+// }
|
|
|
+//
|
|
|
+// class FileNameAwareByteArrayResource(private val fName: String, byteArray: ByteArray, description: String?) :
|
|
|
+// ByteArrayResource(byteArray, description) {
|
|
|
+// override fun getFilename() = fName
|
|
|
+// }
|
|
|
+//
|
|
|
+// class PartialData(val id: String, val total: Int, val idx: Int, val data: String, val startTime: Long)
|
|
|
+//
|
|
|
+// class RequestBuilder(
|
|
|
+// val id: String,
|
|
|
+// val path: String,
|
|
|
+// var method: HttpMethod,
|
|
|
+// var headers: MutableMap<String, String>
|
|
|
+// ) : Serializable {
|
|
|
+// var body: String? = null
|
|
|
+// var queryString: String? = null
|
|
|
+// var parts: MutableList<FilePart>? = null
|
|
|
+// }
|
|
|
+//
|
|
|
+// class FilePart(val name: String, val fileName: String, val data: String)
|
|
|
+//
|
|
|
+// class SocketChunkData(
|
|
|
+// val body: ByteArray?,
|
|
|
+// val header: HttpHeaders?,
|
|
|
+// val status: Int,
|
|
|
+// val part: Int,
|
|
|
+// val totalPart: Int
|
|
|
+// )
|
|
|
+
|
|
|
+// val messageMap: ConcurrentMap<String, MutableList<Message>> = ConcurrentHashMap()
|
|
|
+
|
|
|
+
|
|
|
+}
|