123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- package co.id.datacomsolusindo.ipphonebridge
- import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
- import org.apache.commons.io.IOUtils
- import org.springframework.core.io.ByteArrayResource
- import org.springframework.core.io.Resource
- import org.springframework.http.HttpEntity
- import org.springframework.http.HttpHeaders
- import org.springframework.http.HttpMethod
- import org.springframework.http.HttpStatus
- import org.springframework.messaging.converter.MappingJackson2MessageConverter
- import org.springframework.messaging.simp.SimpMessagingTemplate
- import org.springframework.messaging.simp.stomp.StompCommand
- import org.springframework.messaging.simp.stomp.StompHeaders
- import org.springframework.messaging.simp.stomp.StompSession
- import org.springframework.messaging.simp.stomp.StompSessionHandler
- import org.springframework.util.LinkedMultiValueMap
- import org.springframework.util.MultiValueMap
- import org.springframework.web.client.HttpStatusCodeException
- import org.springframework.web.client.RestTemplate
- import org.springframework.web.socket.client.standard.StandardWebSocketClient
- import org.springframework.web.socket.messaging.WebSocketStompClient
- import java.io.Serializable
- import java.lang.reflect.Type
- import java.net.URI
- import java.nio.charset.Charset
- import java.util.*
- import java.util.concurrent.ConcurrentHashMap
- import java.util.concurrent.ConcurrentMap
- class GatewayClient(
- val socketHost: String,
- val clientName: String,
- val currentLocalPort: Int,
- val socketPort: Int,
- val srvTemplate: SimpMessagingTemplate
- ) {
- enum class State { INITIALIZED, CONNECTING, FAILED, CONNECT }
- var state = State.INITIALIZED
- var session: StompSession? = null
- fun connect() {
- try {
- state = State.CONNECTING
- val client = StandardWebSocketClient()
- val stompClient = WebSocketStompClient(client)
- stompClient.messageConverter = MappingJackson2MessageConverter()
- val sessionHandler =
- BridgeStompSessionHandler(srvTemplate)
- AppLog.write(this.javaClass).info("Try connect to : ws://$socketHost:$socketPort/_websocket")
- val cn = stompClient.connect("ws://$socketHost:$socketPort/_websocket", sessionHandler)
- cn.addCallback({ s ->
- session = s
- if (s != null && s.isConnected) {
- AppLog.write(this.javaClass).info("Socket is connected.")
- state = State.CONNECT
- }
- }, { e ->
- AppLog.write(this.javaClass).info("Can't connect to socket server.")
- AppLog.write(this.javaClass).error(e.message, e)
- state = State.FAILED
- })
- session = sessionHandler.session
- } catch (e: Exception) {
- AppLog.write(this.javaClass)
- .info("Can't connect to socket server, check your configuration and log. retry in 5 second")
- AppLog.write(this.javaClass).error(e.message, e)
- state = State.FAILED
- }
- }
- }
- class BridgeStompSessionHandler(val srvTemplate: SimpMessagingTemplate) : StompSessionHandler {
- var session: StompSession? = null
- // val partialDataMap: ConcurrentMap<String, MutableList<PartialData>> = ConcurrentHashMap()
- // val partialDataMap: ConcurrentMap<String, ConcurrentMap<Int, PartialData>> = ConcurrentHashMap()
- override fun handleException(
- session: StompSession,
- command: StompCommand?,
- headers: StompHeaders,
- btar: ByteArray,
- error: Throwable
- ) {
- AppLog.write(this.javaClass).info("Socket Handle Error")
- AppLog.write(this.javaClass).error("Socket Handle Error", error)
- }
- override fun handleTransportError(session: StompSession, error: Throwable) {
- AppLog.write(this.javaClass).info("Socket Transport Error")
- AppLog.write(this.javaClass).error("Socket Transport Error", error)
- }
- override fun getPayloadType(headers: StompHeaders): Type {
- return Any::class.java
- }
- override fun afterConnected(ss: StompSession, sh: StompHeaders) {
- AppLog.write(this.javaClass).info("Connected to gateway socket server")
- // ss.subscribe("/topic/partial/$clientName", this)
- // ss.subscribe("/topic/healthCheck", this)
- this.session = ss
- }
- override fun handleFrame(stompHeaders: StompHeaders, payload: Any?) {
- // "/topic/incoming/clnum"
- if (payload != null) {
- val reqId = Singletons.requestInstance.entries.map { it.key }
- .firstOrNull { stompHeaders.destination?.endsWith(it) ?: false } ?: ""
- val clientDest =
- ClientHolder.get().map { it.key }.firstOrNull { stompHeaders.destination?.endsWith(it) ?: false } ?: ""
- //
- when (stompHeaders.destination) {
- "/topic/partial/$clientDest" -> {
- srvTemplate.convertAndSend(
- "/topic/partial/$clientDest",
- payload
- )
- }
- "/topic/response/$reqId" -> {
- SocketChecker().collectChunk(payload as SocketChunkData,reqId)
- }
- }
- }
- // when (stompHeaders.destination) {
- // "/topic/healthCheck" -> {
- // if ((1..100).random() == 5) {
- // val msgObj = payload as LinkedHashMap<*, *>
- // AppLog.write(javaClass).info("Socket Health Check " + msgObj["Hi"])
- // }
- // }
- // "/topic/request/$clientName" -> {
- // //process
- // this.session!!.send("/app/confirm/req", "OK")
- // }
- //
- // "/topic/partial/$clientName" -> {
- // val msgObj = payload as LinkedHashMap<*, *>
- // val partData = PartialData(
- // msgObj["id"] as String,
- // msgObj["total"] as Int,
- // msgObj["idx"] as Int,
- // msgObj["data"] as String,
- // msgObj["startTime"] as Long
- // )
- //
- // val pt = partialDataMap.getOrPut(payload["id"] as String) {
- // ConcurrentHashMap()
- // }
- // pt.getOrPut(partData.idx){partData}
- // val listChunk = pt.entries.sortedBy { it.key }.map { it.value }
- // if (listChunk.isNotEmpty() && listChunk.size == listChunk[0].total) {
- // val rbJs = listChunk.joinToString("") { it.data }
- //
- // val mapper = jacksonObjectMapper()
- // val rb = mapper.readValue(rbJs, RequestBuilder::class.java)
- // requestBuilderProcessor(rb, listChunk[0].startTime)
- // }
- //
- // }
- // else -> {
- // AppLog.write(this.javaClass)
- // .info("unknown message from ${stompHeaders.destination} with message $payload")
- // }
- //
- // }
- }
- // fun requestBuilderProcessor(rb: RequestBuilder, startTime: Long) {
- // var intStartTime = startTime
- // val headers: MultiValueMap<String, String> = LinkedMultiValueMap<String, String>()
- // rb.headers.entries.forEach { headers[it.key] = mutableListOf(it.value) }
- // var url = "http://localhost:" + currentLocalPort + rb.path
- // AppLog.write(this.javaClass).info("-----------request builder -----------")
- // AppLog.write(this.javaClass).info(rb.body)
- // AppLog.write(this.javaClass).info(rb.headers)
- // AppLog.write(this.javaClass).info(rb.parts)
- // AppLog.write(this.javaClass).info(rb.queryString)
- // AppLog.write(this.javaClass).info("-----------end builder -----------")
- //
- // val queryString = rb.queryString
- //
- // if (queryString != null) {
- // url = "$url?$queryString"
- // }
- //
- // val urlSend = URI(url)
- // AppLog.write(this.javaClass).info("socket -> url to access $url")
- //
- // val httpEnt = rb.parts?.let {
- // val bodyPart: MultiValueMap<String, Any> = LinkedMultiValueMap()
- // it.forEach { fp ->
- // bodyPart.add(
- // fp.name,
- // FileNameAwareByteArrayResource(fp.fileName, Base64.getDecoder().decode(fp.data), null)
- // )
- // }
- // HttpEntity(bodyPart, headers)
- // } ?: HttpEntity(rb.body?.replace("\r\n", "\n")?.replace("\r", "\n")?.replace("\n", "\r\n"), headers)
- //
- // val d1 = (System.nanoTime() - intStartTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds.
- //
- // AppLog.write(this.javaClass).info("Socket prc preparation. Time: $d1, ID: ${rb.id}")
- // intStartTime = System.nanoTime()
- //
- // var hd: HttpHeaders? = null
- // var stat: Int
- // var bdz: ByteArray?
- //
- // try {
- // val response = restTemplate.exchange(urlSend, rb.method, httpEnt, Resource::class.java)
- // bdz = if (response.body != null) {
- // IOUtils.toByteArray(response.body!!.inputStream)
- // } else {
- // ByteArray(0)
- // }
- // hd = response.headers
- // stat = response.statusCode.value()
- // } catch (httpError: HttpStatusCodeException) {
- // AppLog.write(this.javaClass).info("http error + ${httpError.message}")
- // AppLog.write(this.javaClass).error("http error", httpError)
- // bdz = httpError.responseBodyAsByteArray
- // hd = httpError.responseHeaders
- // stat = httpError.statusCode.value()
- //
- // }
- // catch (ex: Exception) {
- // AppLog.write(this.javaClass).info("request error")
- // AppLog.write(this.javaClass).error("request error", ex)
- // stat = HttpStatus.BAD_REQUEST.ordinal
- // bdz = "unknown error".toByteArray(Charset.defaultCharset())
- // }
- //
- // val d2 = (System.nanoTime() - intStartTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds.
- // AppLog.write(this.javaClass).info("Socket loop internal prc. Time: $d2, ID: ${rb.id}")
- // intStartTime = System.nanoTime()
- //
- // this.session?.let {
- // val chunkSize = 2000
- // if (bdz != null && bdz.size > chunkSize) {
- // val chunk = bdz.toList().chunked(chunkSize).map { bt -> bt.toByteArray() }
- // chunk.forEachIndexed { idx, bt ->
- // if (idx < chunk.size - 1) {
- // it.send("/app/response/${rb.id}", SocketChunkData(bt, null, stat, idx, chunk.size))
- // } else {
- // it.send("/app/response/${rb.id}", SocketChunkData(bt, hd, stat, idx, chunk.size))//send
- // }
- // }
- // } else {
- // //send
- // it.send("/app/response/${rb.id}", SocketChunkData(bdz, hd, stat, 1, 1))
- // }
- // }
- // val d3 = (System.nanoTime() - intStartTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds.
- // AppLog.write(this.javaClass).info("Socket send response. Time: $d3, ID: ${rb.id}")
- // val duration = (System.nanoTime() - startTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds.
- // AppLog.write(this.javaClass).info("Bridge total process Success. Time: $duration, ID: ${rb.id}, URL: $urlSend")
- //
- // }
- //
- // class FileNameAwareByteArrayResource(private val fName: String, byteArray: ByteArray, description: String?) :
- // ByteArrayResource(byteArray, description) {
- // override fun getFilename() = fName
- // }
- //
- // class PartialData(val id: String, val total: Int, val idx: Int, val data: String, val startTime: Long)
- //
- // class RequestBuilder(
- // val id: String,
- // val path: String,
- // var method: HttpMethod,
- // var headers: MutableMap<String, String>
- // ) : Serializable {
- // var body: String? = null
- // var queryString: String? = null
- // var parts: MutableList<FilePart>? = null
- // }
- //
- // class FilePart(val name: String, val fileName: String, val data: String)
- //
- // class SocketChunkData(
- // val body: ByteArray?,
- // val header: HttpHeaders?,
- // val status: Int,
- // val part: Int,
- // val totalPart: Int
- // )
- // val messageMap: ConcurrentMap<String, MutableList<Message>> = ConcurrentHashMap()
- }
|