• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TouK / nussknacker / 5976637142

25 Aug 2023 01:43PM UTC coverage: 81.47% (+0.03%) from 81.438%
5976637142

push

github

Filemon279
Fix migration in 1.11

25 of 25 new or added lines in 2 files covered. (100.0%)

14865 of 18246 relevant lines covered (81.47%)

5.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.01
/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/repository/ProcessRepository.scala
1
package pl.touk.nussknacker.ui.process.repository
2

3
import akka.http.scaladsl.model.HttpHeader
4
import cats.data._
5
import cats.syntax.either._
6
import com.typesafe.scalalogging.LazyLogging
7
import db.util.DBIOActionInstances._
8
import io.circe.generic.JsonCodec
9
import pl.touk.nussknacker.engine.ModelData
10
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName, VersionId}
11
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
12
import pl.touk.nussknacker.restmodel.process.ProcessingType
13
import pl.touk.nussknacker.restmodel.processdetails.ProcessShapeFetchStrategy
14
import pl.touk.nussknacker.ui.EspError
15
import pl.touk.nussknacker.ui.EspError._
16
import pl.touk.nussknacker.ui.db.entity.{CommentActions, ProcessEntityData, ProcessVersionEntityData}
17
import pl.touk.nussknacker.ui.db.{DbRef, EspTables}
18
import pl.touk.nussknacker.ui.process.processingtypedata.ProcessingTypeDataProvider
19
import pl.touk.nussknacker.ui.process.repository.ProcessDBQueryRepository._
20
import pl.touk.nussknacker.ui.process.repository.ProcessRepository.{CreateProcessAction, ProcessCreated, ProcessUpdated, UpdateProcessAction}
21
import pl.touk.nussknacker.ui.security.api.LoggedUser
22
import pl.touk.nussknacker.ui.listener.Comment
23
import slick.dbio.DBIOAction
24

25
import java.sql.Timestamp
26
import java.time.Instant
27
import scala.concurrent.ExecutionContext.Implicits.global
28
import scala.language.higherKinds
29

30
object ProcessRepository {
31

32
  @JsonCodec case class RemoteUserName(name: String) extends AnyVal {
2✔
33
    def display: String = s"Remote[$name]"
34
  }
35

36
  object RemoteUserName {
37
    val headerName = "Remote-User-Name".toLowerCase
2✔
38

39
    def extractFromHeader: HttpHeader => Option[RemoteUserName] = {
40
      case HttpHeader(`headerName`, value) => Some(RemoteUserName(value))
×
41
      case _ => None
2✔
42
    }
43
  }
44

45
  def create(dbRef: DbRef, modelData: ProcessingTypeDataProvider[ModelData, _]): DBProcessRepository =
46
    new DBProcessRepository(dbRef, modelData.mapValues(_.migrations.version))
2✔
47

48
  case class CreateProcessAction(processName: ProcessName,
49
                                 category: String,
50
                                 canonicalProcess: CanonicalProcess,
51
                                 processingType: ProcessingType,
52
                                 isFragment: Boolean,
53
                                 forwardedUserName: Option[RemoteUserName])
54

55
  case class UpdateProcessAction(id: ProcessId,
56
                                 canonicalProcess: CanonicalProcess,
57
                                 comment: Option[Comment],
58
                                 increaseVersionWhenJsonNotChanged: Boolean,
59
                                 forwardedUserName: Option[RemoteUserName])
60

61
  case class ProcessUpdated(processId: ProcessId, oldVersion: Option[VersionId], newVersion: Option[VersionId])
62

63
  case class ProcessCreated(processId: ProcessId, processVersionId: VersionId)
64
}
65

66
trait ProcessRepository[F[_]] {
67

68
  def saveNewProcess(action: CreateProcessAction)(implicit loggedUser: LoggedUser): F[XError[Option[ProcessCreated]]]
69

70
  def updateProcess(action: UpdateProcessAction)(implicit loggedUser: LoggedUser): F[XError[ProcessUpdated]]
71

72
  def updateCategory(processId: ProcessId, category: String)(implicit loggedUser: LoggedUser): F[XError[Unit]]
73

74
  def archive(processId: ProcessId, isArchived: Boolean): F[XError[Unit]]
75

76
  def deleteProcess(processId: ProcessId): F[XError[Unit]]
77

78
  def renameProcess(processId: ProcessIdWithName, newName: ProcessName)(implicit loggedUser: LoggedUser): F[XError[Unit]]
79
}
80

81
class DBProcessRepository(val dbRef: DbRef, val modelVersion: ProcessingTypeDataProvider[Int, _])
82
  extends ProcessRepository[DB] with EspTables with LazyLogging with CommentActions with ProcessDBQueryRepository[DB] {
83

84
  import profile.api._
85

86
  // FIXME: It's temporary way.. After merge and refactor process repositories we can remove it.
87
  override def run[R]: DB[R] => DB[R] = identity
×
88

89
  /**
90
   * These action should be done on transaction - move it to ProcessService.createProcess
91
   */
92
  def saveNewProcess(action: CreateProcessAction)(implicit loggedUser: LoggedUser): DB[XError[Option[ProcessCreated]]] = {
93
    val userName = action.forwardedUserName.map(_.display).getOrElse(loggedUser.username)
×
94
    val processToSave = ProcessEntityData(
2✔
95
      id = ProcessId(-1L), name = action.processName, processCategory = action.category, description = None,
2✔
96
      processingType = action.processingType, isFragment = action.isFragment, isArchived = false,
2✔
97
      createdAt = Timestamp.from(now), createdBy = userName
2✔
98
    )
99

100
    val insertNew = processesTable.returning(processesTable.map(_.id)).into { case (entity, newId) => entity.copy(id = newId) }
2✔
101

102
    val insertAction = logDebug(s"Saving scenario ${action.processName.value} by user $userName").flatMap { _ =>
2✔
103
      latestProcessVersionsNoJsonQuery(action.processName).result.headOption.flatMap {
2✔
104
        case Some(_) => DBIOAction.successful(ProcessAlreadyExists(action.processName.value).asLeft)
2✔
105
        case None => processesTable.filter(_.name === action.processName).result.headOption.flatMap {
2✔
106
          case Some(_) => DBIOAction.successful(ProcessAlreadyExists(action.processName.value).asLeft)
×
107
          case None => (insertNew += processToSave)
108
            .flatMap(entity => updateProcessInternal(entity.id, action.canonicalProcess, increaseVersionWhenJsonNotChanged = false, userName = userName))
2✔
109
            .map(_.map(res => res.newVersion.map(ProcessCreated(res.processId, _))))
2✔
110
        }
111
      }
112
    }
113

114
    insertAction
115
  }
116

117
  def updateProcess(updateProcessAction: UpdateProcessAction)(implicit loggedUser: LoggedUser): DB[XError[ProcessUpdated]] = {
118
    val userName = updateProcessAction.forwardedUserName.map(_.display).getOrElse(loggedUser.username)
1✔
119

120
    def addNewCommentToVersion(processId: ProcessId, versionId: VersionId) = {
121
      newCommentAction(processId, versionId, updateProcessAction.comment)
2✔
122
    }
123

124
    updateProcessInternal(updateProcessAction.id, updateProcessAction.canonicalProcess, updateProcessAction.increaseVersionWhenJsonNotChanged, userName).flatMap {
2✔
125
      // Comment should be added via ProcessService not to mix this repository responsibility.
126
      case updateProcessRes@Right(ProcessUpdated(processId, _, Some(newVersion))) =>
127
        addNewCommentToVersion(processId, newVersion).map(_ => updateProcessRes)
2✔
128
      case updateProcessRes@Right(ProcessUpdated(processId, Some(oldVersion), _)) =>
129
        addNewCommentToVersion(processId, oldVersion).map(_ => updateProcessRes)
2✔
130
      case a => DBIO.successful(a)
×
131
    }
132
  }
133

134
  private def updateProcessInternal(processId: ProcessId, canonicalProcess: CanonicalProcess, increaseVersionWhenJsonNotChanged: Boolean, userName: String)(implicit loggedUser: LoggedUser): DB[XError[ProcessUpdated]] = {
135
    def createProcessVersionEntityData(version: VersionId, processingType: ProcessingType) = ProcessVersionEntityData(
2✔
136
      id = version,
137
      processId = processId,
138
      json = Some(canonicalProcess),
2✔
139
      createDate = Timestamp.from(now),
2✔
140
      user = userName,
141
      modelVersion = modelVersion.forType(processingType),
2✔
142
      componentsUsages = Some(ScenarioComponentsUsagesHelper.compute(canonicalProcess)),
2✔
143
    )
144

145
    def isLastVersionContainsSameProcess(lastVersion: ProcessVersionEntityData): Boolean =
146
      lastVersion.json.contains(canonicalProcess)
2✔
147

148
    def versionToInsert(latestProcessVersion: Option[ProcessVersionEntityData], processingType: ProcessingType) =
149
      Right(latestProcessVersion match {
2✔
150
        case Some(version) if isLastVersionContainsSameProcess(version) && !increaseVersionWhenJsonNotChanged =>
2✔
151
          None
2✔
152
        case Some(version) =>
153
          Some(createProcessVersionEntityData(version.id.increase, processingType))
2✔
154
        case _ =>
155
          Some(createProcessVersionEntityData(VersionId.initialVersionId, processingType))
2✔
156
      })
157

158
    //TODO: why EitherT.right doesn't infere properly here?
159
    def rightT[T](value: DB[T]): EitherT[DB, EspError, T] = EitherT[DB, EspError, T](value.map(Right(_)))
2✔
160

161
    val insertAction = for {
162
      _ <- rightT(logDebug(s"Updating scenario $processId by user $userName"))
2✔
163
      maybeProcess <- rightT(processTableFilteredByUser.filter(_.id === processId).result.headOption)
2✔
164
      process <- EitherT.fromEither[DB](Either.fromOption(maybeProcess, ProcessNotFoundError(processId.value.toString)))
2✔
165
      latestProcessVersion <- rightT(fetchProcessLatestVersionsQuery(processId)(ProcessShapeFetchStrategy.FetchDisplayable).result.headOption)
2✔
166
      newProcessVersion <- EitherT.fromEither(versionToInsert(latestProcessVersion, process.processingType))
2✔
167
      _ <- EitherT.right[EspError](newProcessVersion.map(processVersionsTable += _).getOrElse(dbMonad.pure(0)))
2✔
168
    } yield ProcessUpdated(process.id, oldVersion = latestProcessVersion.map(_.id), newVersion = newProcessVersion.map(_.id))
2✔
169
    insertAction.value
2✔
170
  }
171

172
  private def logDebug(s: String) = {
173
    dbMonad.pure(()).map(_ => logger.debug(s))
2✔
174
  }
175

176
  def deleteProcess(processId: ProcessId): DB[XError[Unit]] =
177
    processesTable.filter(_.id === processId).delete.map {
2✔
178
      case 0 => Left(ProcessNotFoundError(processId.value.toString))
×
179
      case 1 => Right(())
2✔
180
    }
181

182
  def archive(processId: ProcessId, isArchived: Boolean): DB[XError[Unit]] =
183
    processesTable.filter(_.id === processId).map(_.isArchived).update(isArchived).map {
2✔
184
      case 0 => Left(ProcessNotFoundError(processId.value.toString))
×
185
      case 1 => Right(())
2✔
186
    }
187

188
  //accessible only from initializing scripts so far
189
  def updateCategory(processId: ProcessId, category: String)(implicit loggedUser: LoggedUser): DB[XError[Unit]] =
190
    processesTable.filter(_.id === processId).map(_.processCategory).update(category).map {
2✔
191
      case 0 => Left(ProcessNotFoundError(processId.value.toString))
×
192
      case 1 => Right(())
2✔
193
    }
194

195
  def renameProcess(process: ProcessIdWithName, newName: ProcessName)(implicit loggedUser: LoggedUser): DB[XError[Unit]] = {
196
    def updateNameInSingleProcessVersion(processVersion: ProcessVersionEntityData, process: ProcessEntityData) = {
197
      processVersion.json match {
2✔
198
        case Some(json) =>
2✔
199
          val updatedProcess = json.copy(metaData = json.metaData.copy(id = newName.value))
2✔
200
          val updatedProcessVersion = processVersion.copy(json = Some(updatedProcess))
2✔
201
          processVersionsTableWithScenarioJson.filter(version => version.id === processVersion.id && version.processId === process.id)
202
            .update(updatedProcessVersion)
2✔
203
        case None => DBIO.successful(())
×
204
      }
205
    }
206

207
    val updateNameInProcessJson =
208
      processVersionsTableWithScenarioJson.filter(_.processId === process.id)
2✔
209
        .join(processesTable)
2✔
210
        .on { case (version, process) => version.processId === process.id }
2✔
211
        .result.flatMap { processVersions =>
2✔
212
        DBIO.seq(processVersions.map((updateNameInSingleProcessVersion _).tupled): _*)
2✔
213
      }
214

215
    val updateNameInProcess =
216
      processesTable.filter(_.id === process.id).map(_.name).update(newName)
2✔
217

218
    // Comment relates to specific version (in this case last version). Last version could be extracted in one of the
219
    // above queries, but for sake of readability we perform separate query for this matter
220
    // TODO: remove this comment in favour of process-audit-log
221
    val addCommentAction = processVersionsTableWithUnit
222
      .filter(_.processId === process.id)
2✔
223
      .sortBy(_.id.desc)
2✔
224
      .result.headOption.flatMap {
2✔
225
      case Some(version) => newCommentAction(process.id, version.id, Some(UpdateProcessComment(s"Rename: [${process.name.value}] -> [$newName]")))
2✔
226
      case None => DBIO.successful(())
×
227
    }
228

229
    val action = processesTable.filter(_.name === newName).result.headOption.flatMap {
2✔
230
      case Some(_) => DBIO.successful(ProcessAlreadyExists(newName.value).asLeft)
2✔
231
      case None =>
232
        DBIO.seq[Effect.All](
233
          updateNameInProcess,
234
          updateNameInProcessJson,
235
          addCommentAction
236
        ).map(_ => ().asRight).transactionally
2✔
237
    }
238

239
    action
240
  }
241

242
  //to override in tests
243
  protected def now: Instant = Instant.now()
2✔
244

245
  //We use it only on tests..
246
  def changeVersionId(processId: ProcessId, versionId: VersionId, versionIdToUpdate: VersionId) =
247
    processVersionsTableWithUnit
248
      .filter(v => v.id === versionId && v.processId === processId)
249
      .map(_.id)
250
      .update(versionIdToUpdate)
2✔
251
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc