• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TouK / nussknacker / 5976637142

25 Aug 2023 01:43PM UTC coverage: 81.47% (+0.03%) from 81.438%
5976637142

push

github

Filemon279
Fix migration in 1.11

25 of 25 new or added lines in 2 files covered. (100.0%)

14865 of 18246 relevant lines covered (81.47%)

5.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.61
/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala
1
package pl.touk.nussknacker.ui.server
2

3
import akka.actor.ActorSystem
4
import akka.http.scaladsl.server.{Directives, Route}
5
import akka.stream.Materializer
6
import cats.effect.{ContextShift, IO, Resource}
7
import com.typesafe.config.Config
8
import net.ceedubs.ficus.Ficus._
9
import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader
10
import com.typesafe.scalalogging.LazyLogging
11
import io.dropwizard.metrics5.MetricRegistry
12
import pl.touk.nussknacker.engine.dict.ProcessDictSubstitutor
13
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader
14
import pl.touk.nussknacker.engine.util.multiplicity.{Empty, Many, Multiplicity, One}
15
import pl.touk.nussknacker.engine.{CombinedProcessingTypeData, ConfigWithUnresolvedVersion, ProcessingTypeData}
16
import pl.touk.nussknacker.processCounts.influxdb.InfluxCountsReporterCreator
17
import pl.touk.nussknacker.processCounts.{CountsReporter, CountsReporterCreator}
18
import pl.touk.nussknacker.ui.api._
19
import pl.touk.nussknacker.ui.component.DefaultComponentService
20
import pl.touk.nussknacker.ui.config.{AnalyticsConfig, AttachmentsConfig, ComponentLinksConfigExtractor, FeatureTogglesConfig, UsageStatisticsReportsConfig}
21
import pl.touk.nussknacker.ui.db.DbRef
22
import pl.touk.nussknacker.ui.factory.ProcessingTypeDataProviderFactory
23
import pl.touk.nussknacker.ui.initialization.Initialization
24
import pl.touk.nussknacker.ui.listener.ProcessChangeListenerLoader
25
import pl.touk.nussknacker.ui.listener.services.NussknackerServices
26
import pl.touk.nussknacker.ui.metrics.RepositoryGauges
27
import pl.touk.nussknacker.ui.notifications.{NotificationConfig, NotificationServiceImpl}
28
import pl.touk.nussknacker.ui.process.deployment._
29
import pl.touk.nussknacker.ui.process.fragment.{DbFragmentRepository, FragmentResolver}
30
import pl.touk.nussknacker.ui.process.migrate.{HttpRemoteEnvironment, TestModelMigrations}
31
import pl.touk.nussknacker.ui.process.processingtypedata.{BasicProcessingTypeDataReload, Initialization, ProcessingTypeDataProvider, ProcessingTypeDataReload}
32
import pl.touk.nussknacker.ui.process.repository._
33
import pl.touk.nussknacker.ui.process.test.ScenarioTestService
34
import pl.touk.nussknacker.ui.process._
35
import pl.touk.nussknacker.ui.processreport.ProcessCounter
36
import pl.touk.nussknacker.ui.security.api.{AuthenticationConfiguration, AuthenticationResources, LoggedUser}
37
import pl.touk.nussknacker.ui.statistics.UsageStatisticsReportsSettingsDeterminer
38
import pl.touk.nussknacker.ui.suggester.ExpressionSuggester
39
import pl.touk.nussknacker.ui.uiresolving.UIProcessResolving
40
import pl.touk.nussknacker.ui.util.{CorsSupport, OptionsMethodSupport, SecurityHeadersSupport, WithDirectives}
41
import pl.touk.nussknacker.ui.validation.ProcessValidation
42
import sttp.client3.SttpBackend
43
import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend
44

45
import java.util.concurrent.atomic.AtomicReference
46
import java.util.function.Supplier
47
import scala.concurrent.{ExecutionContext, Future}
48
import scala.util.Try
49
import scala.util.control.NonFatal
50

51
class AkkaHttpBasedRouteProvider(dbRef: DbRef,
52
                                 metricsRegistry: MetricRegistry,
53
                                 processingTypeDataProviderFactory: ProcessingTypeDataProviderFactory)
54
                                (implicit system: ActorSystem,
55
                                 materializer: Materializer)
56
  extends RouteProvider[Route]
57
    with Directives
58
    with LazyLogging {
59

60
  override def createRoute(config: ConfigWithUnresolvedVersion): Resource[IO, Route] = {
61
    import system.dispatcher
62
    for {
63
      sttpBackend <- createSttpBackend()
2✔
64
      resolvedConfig = config.resolved
2✔
65
      environment = resolvedConfig.getString("environment")
2✔
66
      featureTogglesConfig = FeatureTogglesConfig.create(resolvedConfig)
2✔
67
      _ = logger.info(s"Designer config loaded: \nfeatureTogglesConfig: $featureTogglesConfig")
68
      countsReporter <- createCountsReporter(featureTogglesConfig, environment, sttpBackend)
2✔
69
      deploymentServiceSupplier = new DelayedInitDeploymentServiceSupplier
2✔
70
      processCategoryService = new ConfigProcessCategoryService(resolvedConfig)
2✔
71
      typeToConfigAndReload <- prepareProcessingTypeData(
2✔
72
        config,
73
        deploymentServiceSupplier,
74
        processCategoryService,
75
        processingTypeDataProviderFactory,
2✔
76
        sttpBackend
77
      )
78
      (typeToConfig, reload) = typeToConfigAndReload
2✔
79
    } yield {
2✔
80
      val stateDefinitionService = new ProcessStateDefinitionService(
2✔
81
        typeToConfig.mapCombined(_.statusNameToStateDefinitionsMapping),
1✔
82
        processCategoryService
83
      )
84

85
      val analyticsConfig = AnalyticsConfig(resolvedConfig)
2✔
86

87
      val modelData = typeToConfig.mapValues(_.modelData)
2✔
88

89
      val managers = typeToConfig.mapValues(_.deploymentManager)
2✔
90

91
      val fragmentRepository = new DbFragmentRepository(dbRef, system.dispatcher)
2✔
92
      val fragmentResolver = new FragmentResolver(fragmentRepository)
2✔
93

94
      val additionalProperties = typeToConfig.mapValues(_.additionalPropertiesConfig)
2✔
95
      val processValidation = ProcessValidation(
2✔
96
        modelData,
97
        additionalProperties,
98
        typeToConfig.mapValues(_.additionalValidators),
2✔
99
        fragmentResolver
100
      )
101

102
      val substitutorsByProcessType = modelData.mapValues(modelData => ProcessDictSubstitutor(modelData.uiDictServices.dictRegistry))
2✔
103
      val processResolving = new UIProcessResolving(processValidation, substitutorsByProcessType)
2✔
104

105
      val dbioRunner = DBIOActionRunner(dbRef)
2✔
106
      val actionRepository = DbProcessActionRepository.create(dbRef, modelData)
2✔
107
      val processRepository = DBFetchingProcessRepository.create(dbRef, actionRepository)
2✔
108
      // TODO: get rid of Future based repositories - it is easier to use everywhere one implementation - DBIOAction based which allows transactions handling
109
      val futureProcessRepository = DBFetchingProcessRepository.createFutureRepository(dbRef, actionRepository)
2✔
110
      val writeProcessRepository = ProcessRepository.create(dbRef, modelData)
2✔
111

112
      val notificationsConfig = resolvedConfig.as[NotificationConfig]("notifications")
2✔
113
      val processChangeListener = ProcessChangeListenerLoader.loadListeners(
2✔
114
        getClass.getClassLoader,
2✔
115
        resolvedConfig,
116
        NussknackerServices(new PullProcessRepository(futureProcessRepository))
2✔
117
      )
118

119
      val scenarioResolver = new ScenarioResolver(fragmentResolver)
2✔
120
      val dmDispatcher = new DeploymentManagerDispatcher(managers, futureProcessRepository)
2✔
121

122
      val deploymentService = new DeploymentServiceImpl(dmDispatcher, processRepository, actionRepository, dbioRunner,
2✔
123
        processValidation, scenarioResolver, processChangeListener, featureTogglesConfig.scenarioStateTimeout)
2✔
124
      deploymentService.invalidateInProgressActions()
2✔
125

126
      deploymentServiceSupplier.set(deploymentService)
2✔
127

128
      // we need to init processing type data after deployment service creation to make sure that it will be done using
129
      // correct classloader and that won't cause further delays during handling requests
130
      reload.init()
2✔
131
      val processActivityRepository = new DbProcessActivityRepository(dbRef)
2✔
132

133
      val authenticationResources = AuthenticationResources(resolvedConfig, getClass.getClassLoader, sttpBackend)
2✔
134

135
      val counter = new ProcessCounter(fragmentRepository)
2✔
136

137
      Initialization.init(modelData.mapValues(_.migrations), dbRef, processRepository, environment)
2✔
138

139
      val newProcessPreparer = NewProcessPreparer(typeToConfig, additionalProperties)
2✔
140

141
      val customActionInvokerService = new CustomActionInvokerServiceImpl(
2✔
142
        futureProcessRepository,
143
        dmDispatcher,
144
        deploymentService
145
      )
146
      val testExecutorService = new ScenarioTestExecutorServiceImpl(scenarioResolver, dmDispatcher)
2✔
147
      val processService = new DBProcessService(deploymentService, newProcessPreparer,
2✔
148
        processCategoryService, processResolving, dbioRunner, futureProcessRepository, actionRepository,
149
        writeProcessRepository, processValidation
150
      )
151
      val scenarioTestService = ScenarioTestService(modelData, featureTogglesConfig.testDataSettings,
2✔
152
        processResolving, counter, testExecutorService)
153

154
      val configProcessToolbarService = new ConfigProcessToolbarService(
2✔
155
        resolvedConfig,
156
        processCategoryService.getAllCategories
2✔
157
      )
158

159
      val processAuthorizer = new AuthorizeProcess(futureProcessRepository)
2✔
160
      val appResources = new AppResources(
2✔
161
        config = resolvedConfig,
162
        processingTypeDataReload = reload,
163
        modelData = modelData,
164
        processRepository = futureProcessRepository,
165
        processValidation = processValidation,
166
        deploymentService = deploymentService,
167
        exposeConfig = featureTogglesConfig.enableConfigEndpoint,
2✔
168
        processCategoryService = processCategoryService
169
      )
170

171
      val componentService = DefaultComponentService(
2✔
172
        ComponentLinksConfigExtractor.extract(resolvedConfig),
2✔
173
        typeToConfig.mapCombined(_.componentIdProvider),
2✔
174
        processService,
175
        processCategoryService
176
      )
177

178
      val notificationService = new NotificationServiceImpl(actionRepository, dbioRunner, notificationsConfig)
2✔
179

180
      initMetrics(metricsRegistry, resolvedConfig, futureProcessRepository)
2✔
181

182
      val apiResourcesWithAuthentication: List[RouteWithUser] = {
183
        val routes = List(
2✔
184
          new ProcessesResources(
2✔
185
            processRepository = futureProcessRepository,
186
            processService = processService,
187
            deploymentService = deploymentService,
188
            processToolbarService = configProcessToolbarService,
189
            processResolving = processResolving,
190
            processAuthorizer = processAuthorizer,
191
            processChangeListener = processChangeListener
192
          ),
193
          new NodesResources(
2✔
194
            futureProcessRepository,
195
            fragmentRepository,
196
            typeToConfig.mapValues(_.modelData),
2✔
197
            processValidation,
198
            typeToConfig.mapValues(v => ExpressionSuggester(v.modelData))
1✔
199
          ),
200
          new ProcessesExportResources(futureProcessRepository, processActivityRepository, processResolving),
2✔
201
          new ProcessActivityResource(processActivityRepository, futureProcessRepository, processAuthorizer),
2✔
202
          new ManagementResources(
2✔
203
            processAuthorizer,
204
            futureProcessRepository,
205
            featureTogglesConfig.deploymentCommentSettings,
2✔
206
            deploymentService,
207
            dmDispatcher,
208
            customActionInvokerService,
209
            metricsRegistry,
2✔
210
            scenarioTestService,
211
            typeToConfig.mapValues(_.modelData)
2✔
212
          ),
213
          new ValidationResources(futureProcessRepository, processResolving),
2✔
214
          new DefinitionResources(modelData, typeToConfig, fragmentRepository, processCategoryService),
2✔
215
          new UserResources(processCategoryService),
2✔
216
          new NotificationResources(notificationService),
2✔
217
          appResources,
218
          new TestInfoResources(processAuthorizer, futureProcessRepository, scenarioTestService),
2✔
219
          new ComponentResource(componentService),
2✔
220
          new AttachmentResources(
2✔
221
            new ProcessAttachmentService(
2✔
222
              AttachmentsConfig.create(resolvedConfig),
2✔
223
              processActivityRepository
224
            ),
225
            futureProcessRepository,
226
            processAuthorizer
227
          ),
228
          new StatusResources(stateDefinitionService),
2✔
229
        )
230

231
        val optionalRoutes = List(
232
          featureTogglesConfig.remoteEnvironment
233
            .map(migrationConfig => new HttpRemoteEnvironment(
×
234
              migrationConfig,
235
              new TestModelMigrations(modelData.mapValues(_.migrations), processValidation),
×
236
              environment
237
            ))
238
            .map { remoteEnvironment =>
2✔
239
              new RemoteEnvironmentResources(remoteEnvironment, futureProcessRepository, processAuthorizer)
×
240
            },
241
          countsReporter.map(reporter => new ProcessReportResources(reporter, counter, futureProcessRepository)),
1✔
242
        ).flatten
2✔
243
        routes ++ optionalRoutes
2✔
244
      }
245

246
      val usageStatisticsReportsConfig = resolvedConfig.as[UsageStatisticsReportsConfig]("usageStatisticsReports")
2✔
247
      val usageStatisticsReportsSettingsDeterminer = UsageStatisticsReportsSettingsDeterminer(usageStatisticsReportsConfig, typeToConfig.mapValues(_.usageStatistics))
×
248

249
      //TODO: WARNING now all settings are available for not sign in user. In future we should show only basic settings
250
      val settingsResources = new SettingsResources(
2✔
251
        featureTogglesConfig,
252
        authenticationResources.name,
2✔
253
        analyticsConfig,
254
        usageStatisticsReportsSettingsDeterminer.determineSettings()
×
255
      )
256
      val apiResourcesWithoutAuthentication: List[Route] = List(
2✔
257
        settingsResources.publicRoute(),
2✔
258
        appResources.publicRoute(),
2✔
259
        authenticationResources.routeWithPathPrefix,
260
      )
261

262
      createAppRoute(
2✔
263
        resolvedConfig = resolvedConfig,
264
        authenticationResources = authenticationResources,
265
        apiResourcesWithAuthentication = apiResourcesWithAuthentication,
266
        apiResourcesWithoutAuthentication = apiResourcesWithoutAuthentication,
267
        processCategoryService = processCategoryService,
268
        developmentMode = featureTogglesConfig.development
2✔
269
      )
270
    }
271
  }
272

273
  private def createSttpBackend()(implicit executionContext: ExecutionContext) = {
274
    implicit val contextShift: ContextShift[IO] = IO.contextShift(executionContext)
2✔
275
    Resource
276
      .make(
277
        acquire = IO(AsyncHttpClientFutureBackend.usingConfigBuilder(identity))
2✔
278
      )(
2✔
279
        release = backend => IO.fromFuture(IO(backend.close()))
2✔
280
      )
281
  }
282

283
  private def initMetrics(metricsRegistry: MetricRegistry,
284
                          config: Config,
285
                          processRepository: DBFetchingProcessRepository[Future] with BasicRepository): Unit = {
286
    new RepositoryGauges(metricsRegistry, config.getDuration("repositoryGaugesCacheDuration"), processRepository).prepareGauges()
2✔
287
  }
288

289
  private def createAppRoute(resolvedConfig: Config,
290
                             authenticationResources: AuthenticationResources,
291
                             apiResourcesWithAuthentication: List[RouteWithUser],
292
                             apiResourcesWithoutAuthentication: List[Route],
293
                             processCategoryService: ProcessCategoryService,
294
                             developmentMode: Boolean)
295
                            (implicit executionContext: ExecutionContext): Route = {
296
    //TODO: In the future will be nice to have possibility to pass authenticator.directive to resource and there us it at concrete path resource
297
    val webResources = new WebResources(resolvedConfig.getString("http.publicPath"))
2✔
298
    WithDirectives(CorsSupport.cors(developmentMode), SecurityHeadersSupport(), OptionsMethodSupport()) {
2✔
299
      pathPrefixTest(!"api") {
2✔
300
        webResources.route
×
301
      } ~ pathPrefix("api") {
2✔
302
        apiResourcesWithoutAuthentication.reduce(_ ~ _)
2✔
303
      } ~ authenticationResources.authenticate() { authenticatedUser =>
2✔
304
        pathPrefix("api") {
2✔
305
          authorize(authenticatedUser.roles.nonEmpty) {
2✔
306
            val loggedUser = LoggedUser(
2✔
307
              authenticatedUser = authenticatedUser,
308
              rules = AuthenticationConfiguration.getRules(resolvedConfig),
2✔
309
              processCategories = processCategoryService.getAllCategories
2✔
310
            )
311
            apiResourcesWithAuthentication.map(_.securedRoute(loggedUser)).reduce(_ ~ _)
2✔
312
          }
313
        }
314
      }
315
    }
316
  }
317

318
  private def createCountsReporter(featureTogglesConfig: FeatureTogglesConfig,
319
                                   environment: String,
320
                                   backend: SttpBackend[Future, Any]) = {
321

322
    featureTogglesConfig.counts match {
2✔
323
      case Some(config) => prepareCountsReporter(environment, config, backend)
×
324
      case None => Resource.pure[IO, None.type](None)
2✔
325
    }
326
  }
327

328
  //by default, we use InfluxCountsReporterCreator
329
  private def prepareCountsReporter(env: String,
330
                                    config: Config,
331
                                    backend: SttpBackend[Future, Any]): Resource[IO, Option[CountsReporter[Future]]] = {
332
    Resource
333
      .make(
334
        acquire = IO {
×
335
          val configAtKey = config.atKey(CountsReporterCreator.reporterCreatorConfigPath)
×
336
          val creator = Multiplicity(ScalaServiceLoader.load[CountsReporterCreator](getClass.getClassLoader)) match {
×
337
            case One(cr) =>
338
              cr
×
339
            case Empty() =>
340
              new InfluxCountsReporterCreator
×
341
            case Many(many) =>
342
              throw new IllegalArgumentException(s"Many CountsReporters found: ${many.mkString(", ")}")
×
343
          }
344

345
          Try(Option(creator.createReporter(env, configAtKey)(backend)))
×
346
            .recover { case NonFatal(ex) =>
×
347
              logger.warn(s"Error while setting up counts mechanism: ${ex.getMessage}. Counts mechanism will be disabled.")
348
              None
×
349
            }
350
            .get
×
351
        }
352
      )(
×
353
        release = counter => IO(counter.foreach(_.close()))
×
354
      )
355
  }
356

357
  private def prepareProcessingTypeData(designerConfig: ConfigWithUnresolvedVersion,
358
                                        deploymentServiceSupplier: Supplier[DeploymentService],
359
                                        categoriesService: ProcessCategoryService,
360
                                        processingTypeDataProviderFactory: ProcessingTypeDataProviderFactory,
361
                                        sttpBackend: SttpBackend[Future, Any])
362
                                       (implicit executionContext: ExecutionContext): Resource[IO, (ProcessingTypeDataProvider[ProcessingTypeData, CombinedProcessingTypeData], ProcessingTypeDataReload with Initialization)] = {
363
    implicit val sttpBackendImplicit: SttpBackend[Future, Any] = sttpBackend
364
    Resource
365
      .make(
366
        acquire = IO(BasicProcessingTypeDataReload.wrapWithReloader(
2✔
367
          () => processingTypeDataProviderFactory.create(designerConfig, deploymentServiceSupplier, categoriesService)
2✔
368
        ))
369
      )(
2✔
370
        release = provider => IO {
2✔
371
          val (processingTypeDataProvider, _) = provider
2✔
372
          processingTypeDataProvider.all.values.foreach(_.close())
2✔
373
        }
374
      )
375
  }
376

377
  private class DelayedInitDeploymentServiceSupplier extends Supplier[DeploymentService] {
378
    private val deploymentServiceRef = new AtomicReference[Option[DeploymentService]](None)
2✔
379

380
    override def get(): DeploymentService = {
381
      val deploymentService = deploymentServiceRef.get()
2✔
382
      deploymentService.getOrElse(throw new IllegalStateException(
×
383
        "Illegal initialization: DeploymentService should be initialized before ProcessingTypeData"
384
      ))
385
    }
386

387
    def set(deploymentService: DeploymentService): Unit = deploymentServiceRef.set(Some(deploymentService))
2✔
388
  }
389

390
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc