| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- package com.datacomsolusindo.migration.service
- import com.datacomsolusindo.cpx_shared_code.utility.CacheData
- import com.datacomsolusindo.cpx_shared_code.utility.SimpleLogger
- import com.datacomsolusindo.migration.MigrationEntity
- import com.datacomsolusindo.migration.data.FinalizedDataService
- import com.datacomsolusindo.migration.data.InsertDataService
- import com.datacomsolusindo.migration.model.MigrationFileData
- import com.datacomsolusindo.migration.queueInsertData
- import org.springframework.stereotype.Service
- import java.util.concurrent.Executors
- import java.util.concurrent.LinkedBlockingQueue
- import kotlin.io.path.name
- import kotlin.system.exitProcess
- import kotlin.time.measureTimedValue
- val executorPrepareData = Executors.newSingleThreadExecutor()
- val executorInsertData = Executors.newSingleThreadExecutor()
- val queuePrepareMigrationData = LinkedBlockingQueue<MigrationFileData>(100)
- val queueInsertMigrationData = LinkedBlockingQueue<Pair<MigrationFileData, List<MutableMap<String, Any?>>>>(100)
- @Service
- class WorkerMigrationService(
- private val migrationEntity: MigrationEntity,
- private val finalizedDataService: FinalizedDataService,
- private val insertDataService: InsertDataService
- ) {
- private val logger = SimpleLogger.getLogger(this::class.java)
- fun startInsertWorker() {
- executorInsertData.submit {
- while (true) {
- val data = queueInsertData.take()
- try {
- val process = measureTimedValue { migrationEntity.insertData(data.first, data.second) }
- logger.info(
- "add data migration ${data.first.simpleName} " +
- "data ${data.second.size} " +
- "success ${data.second.size - process.value.size} " +
- "failed ${process.value.size} " +
- "takes time ${process.duration.inWholeMilliseconds}ms"
- )
- CacheData.update(data.first.simpleName, false)
- } catch (ex: Exception) {
- logger.error("Insert failed", ex)
- }
- }
- }
- }
- fun queueProcessingInsertDataMigration() {
- executorInsertData.submit {
- while (true) {
- val migrationData = queueInsertMigrationData.take()
- val attribute = migrationData.first
- if (attribute.table == "calltransaction") {
- // insertDataService.createTableCallTransactionTemporary()
- insertDataService.updateRawData()
- finalizedDataService.prepareDataCallTransaction()
- }
- val data = migrationData.second
- // .take(1)
- try {
- val chunkData = data.chunked(500)
- logger.info("migration table ${attribute.table} with total data ${data.size} and chunk data ${chunkData.size}")
- chunkData.forEach {
- var success = 0
- val process = measureTimedValue {
- it.let { dt ->
- if (dt.first().keys.any { a -> a == "structure" }) {
- dt.sortedBy { it["structure"].toString().length }
- } else dt
- }.forEach { ch ->
- val isSuccess = finalizedDataService.buildToInsertData(
- attribute.table,
- ch,
- attribute.unique
- )
- success += isSuccess
- }
- }
- logger.info(
- "add data migration ${attribute.table} " +
- "data ${it.size} " +
- "success $success " +
- "failed ${it.size - success} " +
- "takes time ${process.duration.inWholeMilliseconds}ms"
- )
- }
- //println("MIGRATION DATA END---")
- runningTableMigration.remove(attribute.migrationFile.name)
- if (runningTableMigration.isEmpty()) {
- exitProcess(1)
- }
- } catch (ex: Exception) {
- logger.error("Insert failed", ex)
- }
- }
- }
- }
- }
|