• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JohnSnowLabs / spark-nlp / 15252839065

26 May 2025 11:30AM UTC coverage: 52.115% (-0.6%) from 52.715%
15252839065

Pull #14585

github

web-flow
Merge 625e5c10f into 56512b006
Pull Request #14585: SparkNLP 1131 - Introducing Florance-2

0 of 199 new or added lines in 4 files covered. (0.0%)

50 existing lines in 33 files now uncovered.

9931 of 19056 relevant lines covered (52.11%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.89
/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronApproachDistributed.scala
1
/*
2
 * Copyright 2017-2022 John Snow Labs
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *    http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package com.johnsnowlabs.nlp.annotators.pos.perceptron
18

19
import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, TaggedSentence}
20
import com.johnsnowlabs.nlp.annotators.param.ExternalResourceParam
21
import com.johnsnowlabs.nlp.util.io.{ExternalResource, ReadAs, ResourceHelper}
22
import com.johnsnowlabs.nlp.{Annotation, AnnotatorApproach, AnnotatorType}
23
import org.apache.spark.broadcast.Broadcast
24
import org.apache.spark.ml.PipelineModel
25
import org.apache.spark.ml.param.{IntParam, Param}
26
import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable}
27
import org.apache.spark.sql.Dataset
28
import org.apache.spark.sql.functions.rand
29
import org.apache.spark.util.LongAccumulator
30

31
import scala.collection.mutable.{ListBuffer, Map => MMap}
32

33
/** Distributed Averaged Perceptron model to tag words part-of-speech.
34
  *
35
  * Sets a POS tag to each word within a sentence. Its train data (train_pos) is a spark dataset
36
  * of POS format values with Annotation columns.
37
  *
38
  * See
39
  * [[https://github.com/JohnSnowLabs/spark-nlp/blob/master/src/test/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/DistributedPos.scala]]
40
  * for further reference on how to use this APIs.
41
  *
42
  * @param uid
43
  *   internal uid required to generate writable annotators
44
  * @groupname anno Annotator types
45
  * @groupdesc anno
46
  *   Required input and expected output annotator types
47
  * @groupname Ungrouped Members
48
  * @groupname param Parameters
49
  * @groupname setParam Parameter setters
50
  * @groupname getParam Parameter getters
51
  * @groupname Ungrouped Members
52
  * @groupprio param  1
53
  * @groupprio anno  2
54
  * @groupprio Ungrouped 3
55
  * @groupprio setParam  4
56
  * @groupprio getParam  5
57
  * @groupdesc param
58
  *   A list of (hyper-)parameter keys this annotator can take. Users can set and get the
59
  *   parameter values through setters and getters, respectively.
60
  */
61
class PerceptronApproachDistributed(override val uid: String)
62
    extends AnnotatorApproach[PerceptronModel]
63
    with PerceptronTrainingUtils {
64

65
  import com.johnsnowlabs.nlp.AnnotatorType._
66

67
  /** Averaged Perceptron model to tag words part-of-speech */
68
  override val description: String = "Averaged Perceptron model to tag words part-of-speech"
1✔
69

70
  /** column of Array of POS tags that match tokens
71
    *
72
    * @group param
73
    */
74
  val posCol = new Param[String](this, "posCol", "column of Array of POS tags that match tokens")
1✔
75

76
  /** POS tags delimited corpus. Needs 'delimiter' in options
77
    *
78
    * @group param
79
    */
80
  val corpus = new ExternalResourceParam(
1✔
81
    this,
82
    "corpus",
1✔
83
    "POS tags delimited corpus. Needs 'delimiter' in options")
1✔
84

85
  /** Number of iterations in training, converges to better accuracy
86
    *
87
    * @group param
88
    */
89
  val nIterations = new IntParam(
1✔
90
    this,
91
    "nIterations",
1✔
92
    "Number of iterations in training, converges to better accuracy")
1✔
93

94
  setDefault(nIterations, 5)
1✔
95

96
  /** Column containing an array of POS Tags matching every token on the line.
97
    *
98
    * @group setParam
99
    */
100
  def setPosColumn(value: String): this.type = set(posCol, value)
×
101

102
  /** POS tags delimited corpus. Needs 'delimiter' in options
103
    *
104
    * @group setParam
105
    */
106
  def setCorpus(value: ExternalResource): this.type = {
107
    require(
×
108
      value.options.contains("delimiter"),
×
109
      "PerceptronApproach needs 'delimiter' in options to associate words with tags")
×
110
    set(corpus, value)
×
111
  }
112

113
  /** POS tags delimited corpus. Needs 'delimiter' in options
114
    *
115
    * @group setParam
116
    */
117
  def setCorpus(
118
      path: String,
119
      delimiter: String,
120
      readAs: ReadAs.Format = ReadAs.SPARK,
121
      options: Map[String, String] = Map("format" -> "text")): this.type =
122
    set(corpus, ExternalResource(path, readAs, options ++ Map("delimiter" -> delimiter)))
1✔
123

124
  /** Number of iterations for training. May improve accuracy but takes longer. Default 5.
125
    *
126
    * @group setParam
127
    */
128
  def setNIterations(value: Int): this.type = set(nIterations, value)
1✔
129

130
  def this() = this(Identifiable.randomUID("POS"))
1✔
131

132
  /** Output annotator types : POS
133
    *
134
    * @group anno
135
    */
136
  override val outputAnnotatorType: AnnotatorType = POS
1✔
137

138
  /** Input annotator types : TOKEN, DOCUMENT
139
    *
140
    * @group anno
141
    */
142
  override val inputAnnotatorTypes: Array[AnnotatorType] = Array(TOKEN, DOCUMENT)
1✔
143

144
  /** Finds very frequent tags on a word in training, and marks them as non ambiguous based on
145
    * tune parameters ToDo: Move such parameters to configuration
146
    *
147
    * @param taggedSentences
148
    *   Takes entire tagged sentences to find frequent tags
149
    * @param frequencyThreshold
150
    *   How many times at least a tag on a word to be marked as frequent
151
    * @param ambiguityThreshold
152
    *   How much percentage of total amount of words are covered to be marked as frequent
153
    */
154
  def buildTagBook(
155
      taggedSentences: Dataset[TaggedSentence],
156
      frequencyThreshold: Int = 20,
157
      ambiguityThreshold: Double = 0.97): Map[String, String] = {
158
    import ResourceHelper.spark.implicits._
159
    val tagFrequenciesByWord = taggedSentences
160
      .flatMap(_.taggedWords)
1✔
161
      .groupByKey(tw => tw.word.toLowerCase)
1✔
162
      .mapGroups { case (lw, tw) => (lw, tw.toSeq.groupBy(_.tag).mapValues(_.length)) }
1✔
163
      .filter { lwtw =>
1✔
164
        val (_, mode) = lwtw._2.maxBy(t => t._2)
1✔
165
        val n = lwtw._2.values.sum
1✔
166
        n >= frequencyThreshold && (mode / n.toDouble) >= ambiguityThreshold
1✔
167
      }
168

169
    tagFrequenciesByWord
170
      .map { case (word, tagFrequencies) =>
171
        val (tag, _) = tagFrequencies.maxBy(_._2)
172
        logger.debug(s"TRAINING: Ambiguity discarded on: << $word >> set to: << $tag >>")
173
        (word, tag)
174
      }
175
      .collect
1✔
176
      .toMap
1✔
177
  }
178

179
  private[pos] def averageWeights(
180
      tags: Broadcast[Array[String]],
181
      taggedWordBook: Broadcast[Map[String, String]],
182
      featuresWeight: StringMapStringDoubleAccumulator,
183
      updateIteration: LongAccumulator,
184
      timetotals: TupleKeyLongDoubleMapAccumulator): AveragedPerceptron = {
185
    val fw = featuresWeight.value
1✔
186
    val uiv = updateIteration.value
1✔
187
    val totals = timetotals.value
1✔
188
    featuresWeight.reset()
1✔
189
    updateIteration.reset()
1✔
190
    timetotals.reset()
1✔
191
    val finalfw = fw.map { case (feature, weights) =>
1✔
192
      (
1✔
193
        feature,
194
        weights.map { case (tag, weight) =>
1✔
195
          val param = (feature, tag)
1✔
196
          val total = totals
197
            .get(param)
198
            .map(_._2)
1✔
UNCOV
199
            .getOrElse(0.0) + ((uiv - totals.get(param).map(_._1).getOrElse(0L)) * weight)
×
200
          (tag, total / uiv.toDouble)
1✔
201
        })
202
    }
203
    val apr = AveragedPerceptron(tags.value, taggedWordBook.value, finalfw)
1✔
204
    taggedWordBook.destroy()
1✔
205
    tags.destroy()
1✔
206
    apr
207
  }
208

209
  /** Trains a model based on a provided CORPUS
210
    *
211
    * @return
212
    *   A trained averaged model
213
    */
214
  override def train(
215
      dataset: Dataset[_],
216
      recursivePipeline: Option[PipelineModel]): PerceptronModel = {
217

218
    val featuresWeightAcc = new StringMapStringDoubleAccumulator()
1✔
219
    val timeTotalsAcc = new TupleKeyLongDoubleMapAccumulator()
1✔
220
    val updateIterationAcc = new LongAccumulator()
1✔
221
    dataset.sparkSession.sparkContext.register(featuresWeightAcc)
1✔
222
    dataset.sparkSession.sparkContext.register(timeTotalsAcc)
1✔
223
    dataset.sparkSession.sparkContext.register(updateIterationAcc)
1✔
224

225
    /** Generates TagBook, which holds all the word to tags mapping that are not ambiguous */
226
    val taggedSentences: Dataset[TaggedSentence] = if (get(posCol).isDefined) {
1✔
227
      import ResourceHelper.spark.implicits._
228
      val tokenColumn = dataset.schema.fields
×
229
        .find(f =>
230
          f.metadata.contains("annotatorType") && f.metadata
×
231
            .getString("annotatorType") == AnnotatorType.TOKEN)
×
232
        .map(_.name)
×
233
        .get
×
234
      dataset
235
        .select(tokenColumn, $(posCol))
×
236
        .as[(Array[Annotation], Array[String])]
×
237
        .map { case (annotations, posTags) =>
×
238
          lazy val strTokens = annotations.map(_.result).mkString("#")
239
          lazy val strPosTags = posTags.mkString("#")
240
          require(
×
241
            annotations.length == posTags.length,
×
242
            s"Cannot train from $posCol since there" +
×
243
              s" is a row with different amount of tags and tokens:\n$strTokens\n$strPosTags")
×
244
          TaggedSentence(
×
245
            annotations
246
              .zip(posTags)
×
247
              .map { case (annotation, posTag) =>
×
248
                IndexedTaggedWord(annotation.result, posTag, annotation.begin, annotation.end)
×
249
              })
250
        }
251
    } else {
252
      ResourceHelper.parseTupleSentencesDS($(corpus))
1✔
253
    }
254

255
    val nPartitions = $(corpus).options.get("repartition").map(_.toInt).getOrElse(0)
1✔
256
    val doCache = $(corpus).options.get("cache").exists(_.toBoolean == true)
1✔
257
    val repartitioned =
258
      if (nPartitions > 0 && nPartitions != taggedSentences.rdd.partitions.length)
1✔
259
        taggedSentences.repartition(nPartitions)
×
260
      else
261
        taggedSentences
1✔
262

263
    val cachedSentences =
264
      if (doCache)
265
        repartitioned.cache
×
266
      else
267
        repartitioned
1✔
268

269
    val taggedWordBook =
270
      dataset.sparkSession.sparkContext.broadcast(buildTagBook(taggedSentences))
1✔
271

272
    /** finds all distinct tags and stores them */
273
    val classes = {
274
      import ResourceHelper.spark.implicits._
275
      dataset.sparkSession.sparkContext
276
        .broadcast(taggedSentences.flatMap(_.tags).distinct.collect)
1✔
277
    }
278

279
    /** Iterates for training */
280
    (1 to $(nIterations)).foreach { iteration =>
1✔
281
      {
282
        logger.debug(s"TRAINING: Iteration n: $iteration")
1✔
283

284
        val iterationTimestamps = if (iteration == 1) {
1✔
285
          dataset.sparkSession.sparkContext.broadcast(Map.empty[(String, String), Long])
1✔
286
        } else {
287
          dataset.sparkSession.sparkContext.broadcast(timeTotalsAcc.value.mapValues(_._1))
1✔
288
        }
289

290
        val iterationWeights = if (iteration == 1) {
1✔
291
          dataset.sparkSession.sparkContext.broadcast(Map.empty[String, Map[String, Double]])
1✔
292
        } else {
293
          dataset.sparkSession.sparkContext.broadcast(featuresWeightAcc.value)
1✔
294
        }
295

296
        val iterationUpdateCount = if (iteration == 1) {
1✔
297
          dataset.sparkSession.sparkContext.broadcast[Long](0L)
1✔
298
        } else {
299
          dataset.sparkSession.sparkContext.broadcast[Long](updateIterationAcc.value)
1✔
300
        }
301

302
        val sortedSentences: Dataset[TaggedSentence] =
303
          cachedSentences.sort(rand()).sortWithinPartitions(rand())
1✔
304

305
        /** Cache of iteration datasets does not show any improvements, try sample? */
306
        sortedSentences.foreachPartition((partition: Iterator[TaggedSentence]) => {
1✔
307

308
          val _temp1 = ListBuffer.empty[((String, String), Long)]
1✔
309
          iterationTimestamps.value.copyToBuffer(_temp1)
1✔
310
          val newPartitionTimeTotals = MMap.empty[(String, String), (Long, Double)]
1✔
311
          val partitionTimestamps = _temp1.toMap
1✔
312
          _temp1.clear()
1✔
313

314
          val _temp2 = ListBuffer.empty[(String, Map[String, Double])]
1✔
315
          iterationWeights.value.copyToBuffer(_temp2)
1✔
316
          val newPartitionWeights = MMap.empty[String, MMap[String, Double]]
1✔
317
          val partitionWeights = _temp2.toMap
1✔
318
          _temp2.clear()
1✔
319

320
          var partitionUpdateCount: Long = iterationUpdateCount.value
1✔
321
          val partitionUpdateCountOriginal = partitionUpdateCount
322

323
          val partitionTotals: MMap[(String, String), Double] =
324
            MMap.empty[(String, String), Double]
1✔
325

326
          val twb = taggedWordBook.value
1✔
327
          val cls = classes.value
1✔
328

329
          def update(truth: String, guess: String, features: Iterable[String]): Unit = {
330
            def updateFeature(
331
                tag: String,
332
                feature: String,
333
                weight: Double,
334
                value: Double): Unit = {
335

336
              /** update totals and timestamps */
337
              val param = (feature, tag)
1✔
338
              val newTimestamp = partitionUpdateCount
339
              partitionTotals.update(
1✔
340
                param,
341
                partitionTotals.getOrElse(param, 0.0) + ((newTimestamp - newPartitionTimeTotals
1✔
342
                  .get(param)
343
                  .map(_._1)
344
                  .getOrElse(partitionTimestamps.getOrElse(param, 0L))) * weight))
1✔
345
              newPartitionTimeTotals.update(param, (newTimestamp, partitionTotals(param)))
1✔
346

347
              /** update weights */
348
              val newWeights =
349
                newPartitionWeights.getOrElse(feature, MMap()) ++ MMap(tag -> (weight + value))
1✔
350
              newPartitionWeights.update(feature, newWeights)
1✔
351
            }
352

353
            /** if prediction was wrong, take all features and for each feature get feature's
354
              * current tags and their weights congratulate if success and punish for wrong in
355
              * weight
356
              */
357
            if (truth != guess) {
1✔
358
              features.foreach { feature =>
1✔
359
                val weights = newPartitionWeights
360
                  .get(feature)
361
                  .map(pw => partitionWeights.getOrElse(feature, Map()) ++ pw)
1✔
362
                  .orElse(partitionWeights.get(feature))
1✔
363
                  .getOrElse(Map())
1✔
364
                updateFeature(truth, feature, weights.getOrElse(truth, 0.0), 1.0)
1✔
365
                updateFeature(guess, feature, weights.getOrElse(guess, 0.0), -1.0)
1✔
366
              }
367
            }
368
          }
369

370
          def predict(features: Map[String, Int]): String = {
371

372
            /** scores are used for feature scores, which are all by default 0 if a feature has a
373
              * relevant score, look for all its possible tags and their scores multiply their
374
              * weights per the times they appear Return highest tag by score
375
              */
376
            val scoresByTag = features
377
              .filter { case (feature, value) =>
378
                (partitionWeights.contains(feature) || newPartitionWeights
379
                  .contains(feature)) && value != 0
1✔
380
              }
381
              .map { case (feature, value) =>
1✔
382
                newPartitionWeights
383
                  .get(feature)
384
                  .map(pw => partitionWeights.getOrElse(feature, Map()) ++ pw)
1✔
385
                  .getOrElse(partitionWeights(feature))
1✔
386
                  .map { case (tag, weight) =>
1✔
387
                    (tag, value * weight)
1✔
388
                  }
389
              }
390
              .aggregate(Map[String, Double]())(
1✔
391
                (tagsScores, tagScore) =>
392
                  tagScore ++ tagsScores.map { case (tag, score) =>
1✔
393
                    (tag, tagScore.getOrElse(tag, 0.0) + score)
1✔
394
                  },
395
                (pTagScore, cTagScore) =>
396
                  pTagScore.map { case (tag, score) =>
×
397
                    (tag, cTagScore.getOrElse(tag, 0.0) + score)
×
398
                  })
399

400
            /** ToDo: Watch it here. Because of missing training corpus, default values are made
401
              * to make tests pass Secondary sort by tag simply made to match original python
402
              * behavior
403
              */
404
            cls.maxBy { tag =>
1✔
405
              (scoresByTag.getOrElse(tag, 0.0), tag)
1✔
406
            }
407
          }
408

409
          /** In a shuffled sentences list, try to find tag of the word, hold the correct answer
410
            */
411
          partition.foreach { taggedSentence =>
1✔
412
            /** Defines a sentence context, with room to for look back */
413
            var prev = START(0)
1✔
414
            var prev2 = START(1)
1✔
415
            val context = START ++: taggedSentence.words.map(w => normalized(w)) ++: END
1✔
416
            taggedSentence.words.zipWithIndex.foreach { case (word, i) =>
1✔
417
              val guess =
418
                twb.getOrElse(
1✔
419
                  word.toLowerCase, {
1✔
420
                    val features = getFeatures(i, word, context, prev, prev2)
1✔
421
                    val guess = predict(features)
1✔
422
                    partitionUpdateCount += 1L
1✔
423
                    update(taggedSentence.tags(i), guess, features.keys)
1✔
424
                    guess
425
                  })
426

427
              /** shift the context */
428
              prev2 = prev
429
              prev = guess
430
            }
431

432
          }
433
          featuresWeightAcc.addMany(newPartitionWeights)
1✔
434
          timeTotalsAcc.updateMany(newPartitionTimeTotals)
1✔
435
          updateIterationAcc.add(partitionUpdateCount - partitionUpdateCountOriginal)
1✔
436
        })
437
        if (doCache) { sortedSentences.unpersist() }
1✔
438
        iterationTimestamps.unpersist(true)
1✔
439
        iterationWeights.unpersist(true)
1✔
440
        iterationUpdateCount.unpersist(true)
1✔
441
      }
442
    }
443
    logger.debug("TRAINING: Finished all iterations")
1✔
444
    new PerceptronModel().setModel(
1✔
445
      averageWeights(
1✔
446
        classes,
447
        taggedWordBook,
448
        featuresWeightAcc,
449
        updateIterationAcc,
450
        timeTotalsAcc))
451
  }
452
}
453

454
/** This is the companion object of [[PerceptronApproachDistributed]]. Please refer to that class
455
  * for the documentation.
456
  */
457
object PerceptronApproachDistributed extends DefaultParamsReadable[PerceptronApproachDistributed]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc