WorkerMigrationService.kt 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package com.datacomsolusindo.migration.service
  2. import com.datacomsolusindo.cpx_shared_code.utility.CacheData
  3. import com.datacomsolusindo.cpx_shared_code.utility.SimpleLogger
  4. import com.datacomsolusindo.migration.MigrationEntity
  5. import com.datacomsolusindo.migration.data.FinalizedDataService
  6. import com.datacomsolusindo.migration.data.InsertDataService
  7. import com.datacomsolusindo.migration.model.MigrationFileData
  8. import com.datacomsolusindo.migration.queueInsertData
  9. import org.springframework.stereotype.Service
  10. import java.util.concurrent.Executors
  11. import java.util.concurrent.LinkedBlockingQueue
  12. import kotlin.io.path.name
  13. import kotlin.system.exitProcess
  14. import kotlin.time.measureTimedValue
  15. val executorPrepareData = Executors.newSingleThreadExecutor()
  16. val executorInsertData = Executors.newSingleThreadExecutor()
  17. val queuePrepareMigrationData = LinkedBlockingQueue<MigrationFileData>(100)
  18. val queueInsertMigrationData = LinkedBlockingQueue<Pair<MigrationFileData, List<MutableMap<String, Any?>>>>(100)
  19. @Service
  20. class WorkerMigrationService(
  21. private val migrationEntity: MigrationEntity,
  22. private val finalizedDataService: FinalizedDataService,
  23. private val insertDataService: InsertDataService
  24. ) {
  25. private val logger = SimpleLogger.getLogger(this::class.java)
  26. fun startInsertWorker() {
  27. executorInsertData.submit {
  28. while (true) {
  29. val data = queueInsertData.take()
  30. try {
  31. val process = measureTimedValue { migrationEntity.insertData(data.first, data.second) }
  32. logger.info(
  33. "add data migration ${data.first.simpleName} " +
  34. "data ${data.second.size} " +
  35. "success ${data.second.size - process.value.size} " +
  36. "failed ${process.value.size} " +
  37. "takes time ${process.duration.inWholeMilliseconds}ms"
  38. )
  39. CacheData.update(data.first.simpleName, false)
  40. } catch (ex: Exception) {
  41. logger.error("Insert failed", ex)
  42. }
  43. }
  44. }
  45. }
  46. fun queueProcessingInsertDataMigration() {
  47. executorInsertData.submit {
  48. while (true) {
  49. val migrationData = queueInsertMigrationData.take()
  50. val attribute = migrationData.first
  51. if (attribute.table == "calltransaction") {
  52. // insertDataService.createTableCallTransactionTemporary()
  53. insertDataService.updateRawData()
  54. finalizedDataService.prepareDataCallTransaction()
  55. }
  56. val data = migrationData.second
  57. // .take(1)
  58. try {
  59. val chunkData = data.chunked(500)
  60. logger.info("migration table ${attribute.table} with total data ${data.size} and chunk data ${chunkData.size}")
  61. chunkData.forEach {
  62. var success = 0
  63. val process = measureTimedValue {
  64. it.let { dt ->
  65. if (dt.first().keys.any { a -> a == "structure" }) {
  66. dt.sortedBy { it["structure"].toString().length }
  67. } else dt
  68. }.forEach { ch ->
  69. val isSuccess = finalizedDataService.buildToInsertData(
  70. attribute.table,
  71. ch,
  72. attribute.unique
  73. )
  74. success += isSuccess
  75. }
  76. }
  77. logger.info(
  78. "add data migration ${attribute.table} " +
  79. "data ${it.size} " +
  80. "success $success " +
  81. "failed ${it.size - success} " +
  82. "takes time ${process.duration.inWholeMilliseconds}ms"
  83. )
  84. }
  85. //println("MIGRATION DATA END---")
  86. runningTableMigration.remove(attribute.migrationFile.name)
  87. if (runningTableMigration.isEmpty()) {
  88. exitProcess(1)
  89. }
  90. } catch (ex: Exception) {
  91. logger.error("Insert failed", ex)
  92. }
  93. }
  94. }
  95. }
  96. }