• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JohnSnowLabs / spark-nlp / 10656334014

01 Sep 2024 06:19PM CUT coverage: 62.392% (-0.02%) from 62.41%
10656334014

Pull #14355

github

web-flow
Merge 2a3ee298b into 50a69662f
Pull Request #14355: Implementing Mxbai Embeddings

0 of 2 new or added lines in 1 file covered. (0.0%)

27 existing lines in 7 files now uncovered.

8967 of 14372 relevant lines covered (62.39%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.65
/src/main/scala/com/johnsnowlabs/nlp/util/io/ResourceHelper.scala
1
/*
2
 * Copyright 2017-2022 John Snow Labs
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *    http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package com.johnsnowlabs.nlp.util.io
18

19
import com.amazonaws.AmazonServiceException
20
import com.johnsnowlabs.client.CloudResources
21
import com.johnsnowlabs.client.aws.AWSGateway
22
import com.johnsnowlabs.client.util.CloudHelper
23
import com.johnsnowlabs.nlp.annotators.Tokenizer
24
import com.johnsnowlabs.nlp.annotators.common.{TaggedSentence, TaggedWord}
25
import com.johnsnowlabs.nlp.util.io.ReadAs._
26
import com.johnsnowlabs.nlp.{DocumentAssembler, Finisher}
27
import com.johnsnowlabs.util.ConfigHelper
28
import org.apache.commons.io.FileUtils
29
import org.apache.hadoop.fs.{FileSystem, Path}
30
import org.apache.spark.ml.{Pipeline, PipelineModel}
31
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
32

33
import java.io._
34
import java.net.{URI, URL, URLDecoder}
35
import java.nio.file
36
import java.nio.file.{Files, Paths}
37
import java.util.jar.JarFile
38
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
39
import scala.io.BufferedSource
40
import scala.util.{Failure, Success, Try}
41

42
/** Helper one-place for IO management. Streams, source and external input should be handled from
43
  * here
44
  */
45
object ResourceHelper {
46

47
  def getActiveSparkSession: SparkSession =
48
    SparkSession.getActiveSession.getOrElse(
1✔
49
      SparkSession
50
        .builder()
51
        .appName("SparkNLP Default Session")
52
        .master("local[*]")
53
        .config("spark.driver.memory", "22G")
54
        .config("spark.driver.maxResultSize", "0")
55
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
56
        .config("spark.kryoserializer.buffer.max", "1000m")
57
        .getOrCreate())
×
58

59
  def getSparkSessionWithS3(
60
      awsAccessKeyId: String,
61
      awsSecretAccessKey: String,
62
      hadoopAwsVersion: String = ConfigHelper.hadoopAwsVersion,
63
      AwsJavaSdkVersion: String = ConfigHelper.awsJavaSdkVersion,
64
      region: String = "us-east-1",
65
      s3Impl: String = "org.apache.hadoop.fs.s3a.S3AFileSystem",
66
      pathStyleAccess: Boolean = true,
67
      credentialsProvider: String = "TemporaryAWSCredentialsProvider",
68
      awsSessionToken: Option[String] = None): SparkSession = {
69

70
    require(
×
71
      SparkSession.getActiveSession.isEmpty,
×
72
      "Spark session already running, can't apply new configuration for S3.")
×
73

74
    val sparkSession = SparkSession
75
      .builder()
76
      .appName("SparkNLP Session with S3 Support")
×
77
      .master("local[*]")
×
78
      .config("spark.driver.memory", "22G")
×
79
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
×
80
      .config("spark.kryoserializer.buffer.max", "1000M")
×
81
      .config("spark.driver.maxResultSize", "0")
×
82
      .config("spark.hadoop.fs.s3a.access.key", awsAccessKeyId)
×
83
      .config("spark.hadoop.fs.s3a.secret.key", awsSecretAccessKey)
×
84
      .config(ConfigHelper.awsExternalRegion, region)
×
85
      .config(
86
        "spark.hadoop.fs.s3a.aws.credentials.provider",
×
87
        s"org.apache.hadoop.fs.s3a.$credentialsProvider")
×
88
      .config("spark.hadoop.fs.s3a.impl", s3Impl)
×
89
      .config(
90
        "spark.jars.packages",
×
91
        "org.apache.hadoop:hadoop-aws:" + hadoopAwsVersion + ",com.amazonaws:aws-java-sdk:" + AwsJavaSdkVersion)
×
92
      .config("spark.hadoop.fs.s3a.path.style.access", pathStyleAccess.toString)
×
93

94
    if (credentialsProvider == "TemporaryAWSCredentialsProvider") {
×
95
      require(
×
96
        awsSessionToken.isDefined,
×
97
        "AWS Session token needs to be provided for TemporaryAWSCredentialsProvider.")
×
98
      sparkSession.config("spark.hadoop.fs.s3a.session.token", awsSessionToken.get)
×
99
    }
100

101
    sparkSession.getOrCreate()
×
102
  }
103

104
  lazy val spark: SparkSession = getActiveSparkSession
105

106
  /** Structure for a SourceStream coming from compiled content */
107
  case class SourceStream(resource: String) {
108

109
    var fileSystem: Option[FileSystem] = None
1✔
110
    private val (pathExists: Boolean, path: Option[Path]) = OutputHelper.doesPathExists(resource)
1✔
111
    if (!pathExists) {
1✔
112
      throw new FileNotFoundException(s"file or folder: $resource not found")
1✔
113
    } else {
114
      fileSystem = Some(OutputHelper.getFileSystem(resource))
1✔
115
    }
116

117
    val pipe: Seq[InputStream] = getPipe(fileSystem.get)
1✔
118
    private val openBuffers: Seq[BufferedSource] = pipe.map(pp => {
1✔
119
      new BufferedSource(pp)("UTF-8")
1✔
120
    })
121
    val content: Seq[Iterator[String]] = openBuffers.map(c => c.getLines())
1✔
122

123
    private def getPipe(fileSystem: FileSystem): Seq[InputStream] = {
124
      if (fileSystem.getScheme == "s3a") {
1✔
125
        val awsGateway = new AWSGateway()
×
126
        val (bucket, s3Path) = CloudHelper.parseS3URI(path.get.toString)
×
127
        val inputStreams = awsGateway.listS3Files(bucket, s3Path).map { summary =>
×
128
          val s3Object = awsGateway.getS3Object(bucket, summary.getKey)
×
129
          s3Object.getObjectContent
×
130
        }
131
        inputStreams
×
132
      } else {
1✔
133
        val files = fileSystem.listFiles(path.get, true)
1✔
134
        val buffer = ArrayBuffer.empty[InputStream]
1✔
135
        while (files.hasNext) buffer.append(fileSystem.open(files.next().getPath))
1✔
136
        buffer
137
      }
138
    }
139

140
    /** Copies the resource into a local temporary folder and returns the folders URI.
141
      *
142
      * @param prefix
143
      *   Prefix for the temporary folder.
144
      * @return
145
      *   URI of the created temporary folder with the resource
146
      */
147
    def copyToLocal(prefix: String = "sparknlp_tmp_"): URI = {
148
      if (fileSystem.get.getScheme == "file")
×
149
        return URI.create(resource)
1✔
150

151
      val destination: file.Path = Files.createTempDirectory(prefix)
×
152

153
      val destinationUri = fileSystem.get.getScheme match {
×
154
        case "hdfs" =>
155
          fileSystem.get.copyToLocalFile(false, path.get, new Path(destination.toUri), true)
×
156
          if (fileSystem.get.getFileStatus(path.get).isDirectory)
×
157
            Paths.get(destination.toString, path.get.getName).toUri
×
158
          else destination.toUri
×
159
        case "dbfs" =>
160
          val dbfsPath = path.get.toString.replace("dbfs:/", "/dbfs/")
×
161
          val sourceFile = new File(dbfsPath)
×
162
          val targetFile = new File(destination.toString)
×
163
          if (sourceFile.isFile) FileUtils.copyFileToDirectory(sourceFile, targetFile)
×
164
          else FileUtils.copyDirectory(sourceFile, targetFile)
×
165
          targetFile.toURI
×
166
        case _ =>
167
          val files = fileSystem.get.listFiles(path.get, false)
×
168
          while (files.hasNext) {
×
169
            fileSystem.get.copyFromLocalFile(files.next.getPath, new Path(destination.toUri))
×
170
          }
171
          destination.toUri
×
172
      }
173

174
      destinationUri
175
    }
176

177
    def close(): Unit = {
178
      openBuffers.foreach(_.close())
1✔
179
      pipe.foreach(_.close)
1✔
180
    }
181
  }
182

183
  private def fixTarget(path: String): String = {
184
    val toSearch =
185
      s"^.*target\\${File.separator}.*scala-.*\\${File.separator}.*classes\\${File.separator}"
1✔
186
    if (path.matches(toSearch + ".*")) {
1✔
187
      path.replaceFirst(toSearch, "")
1✔
188
    } else {
189
      path
×
190
    }
191
  }
192

193
  /** Copies the remote resource to a local temporary folder and returns its absolute path.
194
    *
195
    * Currently, file:/, s3:/, hdfs:/ and dbfs:/ are supported.
196
    *
197
    * If the file is already on the local file system just the absolute path will be returned
198
    * instead.
199
    * @param path
200
    *   Path to the resource
201
    * @return
202
    *   Absolute path to the temporary or local folder of the resource
203
    */
204
  def copyToLocal(path: String): String = try {
1✔
205
    val localUri =
206
      if (CloudHelper.isCloudPath(path)) { // Download directly from Cloud Buckets
1✔
207
        CloudResources.downloadBucketToLocalTmp(path)
×
208
      } else { // Use Source Stream
1✔
209
        val pathWithProtocol: String =
210
          if (URI.create(path).getScheme == null) new File(path).toURI.toURL.toString else path
1✔
211
        val resource = SourceStream(pathWithProtocol)
1✔
212
        resource.copyToLocal()
1✔
213
      }
214

215
    new File(localUri).getAbsolutePath // Platform independent path
1✔
216
  } catch {
217
    case awsE: AmazonServiceException =>
218
      println("Error while retrieving folder from S3. Make sure you have set the right " +
×
219
        "access keys with proper permissions in your configuration. For an example please see " +
220
        "https://github.com/JohnSnowLabs/spark-nlp/blob/master/examples/python/training/english/dl-ner/mfa_ner_graphs_s3.ipynb")
221
      throw awsE
×
222
    case e: Exception =>
223
      val copyToLocalErrorMessage: String =
224
        "Please make sure the provided path exists and is accessible while keeping in mind only file:/, hdfs:/, dbfs:/ and s3:/ protocols are supported at the moment."
×
225
      println(
×
226
        s"$e \n Therefore, could not create temporary local directory for provided path $path. $copyToLocalErrorMessage")
×
227
      throw e
×
228
  }
229

230
  /** NOT thread safe. Do not call from executors. */
231
  def getResourceStream(path: String): InputStream = {
232
    if (new File(path).exists())
1✔
233
      new FileInputStream(new File(path))
1✔
234
    else {
235
      Option(getClass.getResourceAsStream(path))
1✔
236
        .getOrElse {
1✔
237
          Option(getClass.getClassLoader.getResourceAsStream(path))
1✔
238
            .getOrElse(throw new IllegalArgumentException(f"Wrong resource path $path"))
×
239
        }
240
    }
241
  }
242

243
  def getResourceFile(path: String): URL = {
244
    var dirURL = getClass.getResource(path)
1✔
245

246
    if (dirURL == null)
1✔
247
      dirURL = getClass.getClassLoader.getResource(path)
1✔
248

249
    dirURL
250
  }
251

252
  def listResourceDirectory(path: String): Seq[String] = {
253
    val dirURL = getResourceFile(path)
1✔
254

255
    if (dirURL != null && dirURL.getProtocol.equals("file") && new File(dirURL.toURI).exists()) {
1✔
256
      /* A file path: easy enough */
257
      return new File(dirURL.toURI).listFiles.sorted.map(_.getPath).map(fixTarget)
1✔
UNCOV
258
    } else if (dirURL == null) {
×
259
      /* path not in resources and not in disk */
260
      throw new FileNotFoundException(path)
1✔
261
    }
262

263
    if (dirURL.getProtocol.equals("jar")) {
×
264
      /* A JAR path */
265
      val jarPath =
266
        dirURL.getPath.substring(5, dirURL.getPath.indexOf("!")) // strip out only the JAR file
×
267
      val jar = new JarFile(URLDecoder.decode(jarPath, "UTF-8"))
×
268
      val entries = jar.entries()
×
269
      val result = new ArrayBuffer[String]()
×
270

271
      val pathToCheck = path
272
        .stripPrefix(File.separator.replaceAllLiterally("\\", "/"))
×
273
        .stripSuffix(File.separator) +
×
274
        File.separator.replaceAllLiterally("\\", "/")
×
275

276
      while (entries.hasMoreElements) {
×
277
        val name = entries.nextElement().getName.stripPrefix(File.separator)
×
278
        if (name.startsWith(pathToCheck)) { // filter according to the path
×
279
          var entry = name.substring(pathToCheck.length())
×
280
          val checkSubdir = entry.indexOf("/")
×
281
          if (checkSubdir >= 0) {
×
282
            // if it is a subdirectory, we just return the directory name
283
            entry = entry.substring(0, checkSubdir)
×
284
          }
285
          if (entry.nonEmpty) {
×
286
            result.append(pathToCheck + entry)
×
287
          }
288
        }
289
      }
290
      return result.distinct.sorted
×
291
    }
292

293
    throw new UnsupportedOperationException(s"Cannot list files for URL $dirURL")
×
294
  }
295

296
  /** General purpose key value parser from source Currently read only text files
297
    *
298
    * @return
299
    */
300
  def parseKeyValueText(er: ExternalResource): Map[String, String] = {
301
    er.readAs match {
1✔
302
      case TEXT =>
303
        val sourceStream = SourceStream(er.path)
1✔
304
        val res = sourceStream.content
305
          .flatMap(c =>
1✔
306
            c.map(line => {
1✔
307
              val kv = line.split(er.options("delimiter"))
1✔
308
              (kv.head.trim, kv.last.trim)
1✔
309
            }))
310
          .toMap
1✔
311
        sourceStream.close()
1✔
312
        res
313
      case SPARK =>
314
        import spark.implicits._
315
        val dataset = spark.read
316
          .options(er.options)
317
          .format(er.options("format"))
318
          .options(er.options)
319
          .option("delimiter", er.options("delimiter"))
320
          .load(er.path)
321
          .toDF("key", "value")
×
322
        val keyValueStore = MMap.empty[String, String]
×
323
        dataset.as[(String, String)].foreach { kv =>
×
324
          keyValueStore(kv._1) = kv._2
×
325
        }
326
        keyValueStore.toMap
×
327
      case _ =>
328
        throw new Exception("Unsupported readAs")
×
329
    }
330
  }
331

332
  def parseKeyListValues(externalResource: ExternalResource): Map[String, List[String]] = {
333
    externalResource.readAs match {
1✔
334
      case TEXT =>
335
        val sourceStream = SourceStream(externalResource.path)
1✔
336
        val keyValueStore = MMap.empty[String, List[String]]
1✔
337
        sourceStream.content.foreach(content =>
1✔
338
          content.foreach { line =>
1✔
339
            {
340
              val keyValues = line.split(externalResource.options("delimiter"))
1✔
341
              val key = keyValues.head
1✔
342
              val value = keyValues.drop(1).toList
1✔
343
              val storedValue = keyValueStore.get(key)
1✔
344
              if (storedValue.isDefined && !storedValue.contains(value)) {
×
345
                keyValueStore.update(key, storedValue.get ++ value)
×
346
              } else keyValueStore(key) = value
1✔
347
            }
348
          })
349
        sourceStream.close()
1✔
350
        keyValueStore.toMap
1✔
351
    }
352
  }
353

354
  def parseKeyArrayValues(externalResource: ExternalResource): Map[String, Array[Float]] = {
355
    externalResource.readAs match {
×
356
      case TEXT =>
357
        val sourceStream = SourceStream(externalResource.path)
×
358
        val keyValueStore = MMap.empty[String, Array[Float]]
×
359
        sourceStream.content.foreach(content =>
×
360
          content.foreach { line =>
×
361
            {
362
              val keyValues = line.split(externalResource.options("delimiter"))
×
363
              val key = keyValues.head
×
364
              val value = keyValues.drop(1).map(x => x.toFloat)
×
365
              if (value.length > 1) {
×
366
                keyValueStore(key) = value
×
367
              }
368
            }
369
          })
370
        sourceStream.close()
×
371
        keyValueStore.toMap
×
372
    }
373
  }
374

375
  /** General purpose line parser from source Currently read only text files
376
    *
377
    * @return
378
    */
379
  def parseLines(er: ExternalResource): Array[String] = {
380
    er.readAs match {
1✔
381
      case TEXT =>
382
        val sourceStream = SourceStream(er.path)
1✔
383
        val res = sourceStream.content.flatten.toArray
1✔
384
        sourceStream.close()
1✔
385
        res
386
      case SPARK =>
387
        import spark.implicits._
388
        spark.read
389
          .options(er.options)
390
          .format(er.options("format"))
391
          .load(er.path)
392
          .as[String]
393
          .collect
×
394
      case _ =>
395
        throw new Exception("Unsupported readAs")
×
396
    }
397
  }
398

399
  /** General purpose line parser from source Currently read only text files
400
    *
401
    * @return
402
    */
403
  def parseLinesIterator(er: ExternalResource): Seq[Iterator[String]] = {
404
    er.readAs match {
1✔
405
      case TEXT =>
406
        val sourceStream = SourceStream(er.path)
1✔
407
        sourceStream.content
1✔
408
      case _ =>
409
        throw new Exception("Unsupported readAs")
×
410
    }
411
  }
412

413
  /** General purpose tuple parser from source Currently read only text files
414
    *
415
    * @return
416
    */
417
  def parseTupleText(er: ExternalResource): Array[(String, String)] = {
418
    er.readAs match {
1✔
419
      case TEXT =>
420
        val sourceStream = SourceStream(er.path)
1✔
421
        val res = sourceStream.content
422
          .flatMap(c =>
1✔
423
            c.filter(_.nonEmpty)
1✔
424
              .map(line => {
1✔
425
                val kv = line.split(er.options("delimiter")).map(_.trim)
1✔
426
                (kv.head, kv.last)
1✔
427
              }))
428
          .toArray
1✔
429
        sourceStream.close()
1✔
430
        res
431
      case SPARK =>
432
        import spark.implicits._
433
        val dataset = spark.read.options(er.options).format(er.options("format")).load(er.path)
×
434
        val lineStore = spark.sparkContext.collectionAccumulator[String]
×
435
        dataset.as[String].foreach(l => lineStore.add(l))
×
436
        val result = lineStore.value.toArray.map(line => {
×
437
          val kv = line.toString.split(er.options("delimiter")).map(_.trim)
×
438
          (kv.head, kv.last)
×
439
        })
440
        lineStore.reset()
×
441
        result
442
      case _ =>
443
        throw new Exception("Unsupported readAs")
×
444
    }
445
  }
446

447
  /** General purpose tuple parser from source Currently read only text files
448
    *
449
    * @return
450
    */
451
  def parseTupleSentences(er: ExternalResource): Array[TaggedSentence] = {
452
    er.readAs match {
1✔
453
      case TEXT =>
454
        val sourceStream = SourceStream(er.path)
1✔
455
        val result = sourceStream.content
456
          .flatMap(c =>
1✔
457
            c.filter(_.nonEmpty)
1✔
458
              .map(line => {
1✔
459
                line
460
                  .split("\\s+")
1✔
461
                  .filter(kv => {
1✔
462
                    val s = kv.split(er.options("delimiter").head)
1✔
463
                    s.length == 2 && s(0).nonEmpty && s(1).nonEmpty
1✔
464
                  })
465
                  .map(kv => {
1✔
466
                    val p = kv.split(er.options("delimiter").head)
1✔
467
                    TaggedWord(p(0), p(1))
1✔
468
                  })
469
              }))
470
          .toArray
1✔
471
        sourceStream.close()
1✔
472
        result.map(TaggedSentence(_))
1✔
473
      case SPARK =>
474
        import spark.implicits._
475
        val dataset = spark.read.options(er.options).format(er.options("format")).load(er.path)
×
476
        val result = dataset
477
          .as[String]
478
          .filter(_.nonEmpty)
479
          .map(line => {
480
            line
481
              .split("\\s+")
482
              .filter(kv => {
483
                val s = kv.split(er.options("delimiter").head)
484
                s.length == 2 && s(0).nonEmpty && s(1).nonEmpty
485
              })
486
              .map(kv => {
487
                val p = kv.split(er.options("delimiter").head)
488
                TaggedWord(p(0), p(1))
489
              })
490
          })
491
          .collect
×
492
        result.map(TaggedSentence(_))
×
493
      case _ =>
494
        throw new Exception("Unsupported readAs")
×
495
    }
496
  }
497

498
  def parseTupleSentencesDS(er: ExternalResource): Dataset[TaggedSentence] = {
499
    er.readAs match {
1✔
500
      case SPARK =>
501
        import spark.implicits._
502
        val dataset = spark.read.options(er.options).format(er.options("format")).load(er.path)
1✔
503
        val result = dataset
504
          .as[String]
1✔
505
          .filter(_.nonEmpty)
1✔
506
          .map(line => {
1✔
507
            line
508
              .split("\\s+")
1✔
509
              .filter(kv => {
1✔
510
                val s = kv.split(er.options("delimiter").head)
1✔
511
                s.length == 2 && s(0).nonEmpty && s(1).nonEmpty
1✔
512
              })
513
              .map(kv => {
1✔
514
                val p = kv.split(er.options("delimiter").head)
1✔
515
                TaggedWord(p(0), p(1))
1✔
516
              })
517
          })
518
        result.map(TaggedSentence(_))
1✔
519
      case _ =>
520
        throw new Exception(
×
521
          "Unsupported readAs. If you're training POS with large dataset, consider PerceptronApproachDistributed")
522
    }
523
  }
524

525
  /** For multiple values per keys, this optimizer flattens all values for keys to have constant
526
    * access
527
    */
528
  def flattenRevertValuesAsKeys(er: ExternalResource): Map[String, String] = {
529
    er.readAs match {
1✔
530
      case TEXT =>
531
        val m: MMap[String, String] = MMap()
1✔
532
        val sourceStream = SourceStream(er.path)
1✔
533
        sourceStream.content.foreach(c =>
1✔
534
          c.foreach(line => {
1✔
535
            val kv = line.split(er.options("keyDelimiter")).map(_.trim)
1✔
536
            if (kv.length > 1) {
1✔
537
              val key = kv(0)
1✔
538
              val values = kv(1).split(er.options("valueDelimiter")).map(_.trim)
1✔
539
              values.foreach(m(_) = key)
1✔
540
            }
541
          }))
542
        sourceStream.close()
1✔
543
        m.toMap
1✔
544
      case SPARK =>
545
        import spark.implicits._
546
        val dataset = spark.read.options(er.options).format(er.options("format")).load(er.path)
×
547
        val valueAsKeys = MMap.empty[String, String]
×
548
        dataset
549
          .as[String]
×
550
          .foreach(line => {
×
551
            val kv = line.split(er.options("keyDelimiter")).map(_.trim)
×
552
            if (kv.length > 1) {
×
553
              val key = kv(0)
×
554
              val values = kv(1).split(er.options("valueDelimiter")).map(_.trim)
×
555
              values.foreach(v => valueAsKeys(v) = key)
×
556
            }
557
          })
558
        valueAsKeys.toMap
×
559
      case _ =>
560
        throw new Exception("Unsupported readAs")
×
561
    }
562
  }
563

564
  /** General purpose read saved Parquet Currently read only Parquet format
565
    *
566
    * @return
567
    */
568
  def readSparkDataFrame(er: ExternalResource): DataFrame = {
569
    er.readAs match {
×
570
      case SPARK =>
571
        val dataset = spark.read.options(er.options).format(er.options("format")).load(er.path)
×
572
        dataset
573
      case _ =>
574
        throw new Exception("Unsupported readAs - only accepts SPARK")
×
575
    }
576
  }
577

578
  def getWordCount(
579
      externalResource: ExternalResource,
580
      wordCount: MMap[String, Long] = MMap.empty[String, Long].withDefaultValue(0),
581
      pipeline: Option[PipelineModel] = None): MMap[String, Long] = {
582
    externalResource.readAs match {
1✔
583
      case TEXT =>
584
        val sourceStream = SourceStream(externalResource.path)
1✔
585
        val regex = externalResource.options("tokenPattern").r
1✔
586
        sourceStream.content.foreach(c =>
1✔
587
          c.foreach { line =>
1✔
588
            {
589
              val words: List[String] = regex.findAllMatchIn(line).map(_.matched).toList
1✔
590
              words.foreach(w =>
1✔
591
                // Creates a Map of frequency words: word -> frequency based on ExternalResource
592
                wordCount(w) += 1)
1✔
593
            }
594
          })
595
        sourceStream.close()
1✔
596
        if (wordCount.isEmpty)
1✔
597
          throw new FileNotFoundException(
×
598
            "Word count dictionary for spell checker does not exist or is empty")
599
        wordCount
600
      case SPARK =>
601
        import spark.implicits._
602
        val dataset = spark.read
603
          .options(externalResource.options)
×
604
          .format(externalResource.options("format"))
×
605
          .load(externalResource.path)
×
606
        val transformation = {
607
          if (pipeline.isDefined) {
×
608
            pipeline.get.transform(dataset)
×
609
          } else {
×
610
            val documentAssembler = new DocumentAssembler()
611
              .setInputCol("value")
×
612
            val tokenizer = new Tokenizer()
613
              .setInputCols("document")
×
614
              .setOutputCol("token")
×
615
              .setTargetPattern(externalResource.options("tokenPattern"))
×
616
            val finisher = new Finisher()
617
              .setInputCols("token")
618
              .setOutputCols("finished")
619
              .setAnnotationSplitSymbol("--")
×
620
            new Pipeline()
621
              .setStages(Array(documentAssembler, tokenizer, finisher))
622
              .fit(dataset)
623
              .transform(dataset)
×
624
          }
625
        }
626
        val wordCount = MMap.empty[String, Long].withDefaultValue(0)
×
627
        transformation
628
          .select("finished")
×
629
          .as[String]
×
630
          .foreach(text =>
×
631
            text
632
              .split("--")
×
633
              .foreach(t => {
×
634
                wordCount(t) += 1
×
635
              }))
636
        wordCount
637
      case _ => throw new IllegalArgumentException("format not available for word count")
×
638
    }
639
  }
640

641
  def getFilesContentBuffer(externalResource: ExternalResource): Seq[Iterator[String]] = {
642
    externalResource.readAs match {
1✔
643
      case TEXT =>
644
        SourceStream(externalResource.path).content
1✔
645
      case _ =>
646
        throw new Exception("Unsupported readAs")
1✔
647
    }
648
  }
649

650
  def listLocalFiles(path: String): List[File] = {
651
    val fileSystem = OutputHelper.getFileSystem(path)
1✔
652

653
    val filesPath = fileSystem.getScheme match {
1✔
654
      case "hdfs" =>
655
        if (path.startsWith("file:")) {
×
656
          Option(new File(path.replace("file:", "")).listFiles())
×
657
        } else {
658
          try {
×
659
            val filesIterator = fileSystem.listFiles(new Path(path), false)
×
660
            val files: ArrayBuffer[File] = ArrayBuffer()
×
661

662
            while (filesIterator.hasNext) {
×
663
              val file = new File(filesIterator.next().getPath.toString)
×
664
              files.append(file)
×
665
            }
666

667
            Option(files.toArray)
×
668
          } catch {
669
            case _: FileNotFoundException =>
670
              Option(new File(path).listFiles())
×
671
          }
672

673
        }
674
      case "dbfs" if path.startsWith("dbfs:") =>
×
675
        Option(new File(path.replace("dbfs:", "/dbfs/")).listFiles())
×
676
      case _ => Option(new File(path).listFiles())
1✔
677
    }
678

679
    val files = filesPath.getOrElse(throw new FileNotFoundException(s"folder: $path not found"))
1✔
680
    files.toList
1✔
681
  }
682

683
  def getFileFromPath(pathToFile: String): File = {
684
    val fileSystem = OutputHelper.getFileSystem
1✔
685
    val filePath = fileSystem.getScheme match {
1✔
686
      case "hdfs" =>
687
        if (pathToFile.startsWith("file:")) {
×
688
          new File(pathToFile.replace("file:", ""))
×
689
        } else new File(pathToFile)
×
690
      case "dbfs" if pathToFile.startsWith("dbfs:") =>
×
691
        new File(pathToFile.replace("dbfs:", "/dbfs/"))
×
692
      case _ => new File(pathToFile)
1✔
693
    }
694

695
    filePath
696
  }
697

698
  def validFile(path: String): Boolean = {
699

700
    if (path.isEmpty) return false
1✔
701

702
    var isValid = validLocalFile(path) match {
1✔
703
      case Success(value) => value
704
      case Failure(_) => false
×
705
    }
706

707
    if (!isValid) {
1✔
708
      validHadoopFile(path) match {
1✔
709
        case Success(value) => isValid = value
710
        case Failure(_) => isValid = false
1✔
711
      }
712
    }
713

714
    if (!isValid) {
1✔
715
      validDbfsFile(path) match {
1✔
716
        case Success(value) => isValid = value
717
        case Failure(_) => isValid = false
×
718
      }
719
    }
720

721
    isValid
722
  }
723

724
  private def validLocalFile(path: String): Try[Boolean] = Try {
1✔
725
    Files.exists(Paths.get(path))
1✔
726
  }
727

728
  private def validHadoopFile(path: String): Try[Boolean] = Try {
1✔
729
    val hadoopPath = new Path(path)
1✔
730
    val fileSystem = OutputHelper.getFileSystem
1✔
731
    fileSystem.exists(hadoopPath)
1✔
732
  }
733

734
  private def validDbfsFile(path: String): Try[Boolean] = Try {
1✔
735
    getFileFromPath(path).exists()
1✔
736
  }
737

738
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc