• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IQSS / dataverse / #22987

23 Aug 2024 06:44PM CUT coverage: 20.61% (-0.2%) from 20.791%
#22987

Pull #10781

github

landreev
added an upfront locks check to the /addGlobusFiles api #10623
Pull Request #10781: Improved handling of Globus uploads

4 of 417 new or added lines in 15 files covered. (0.96%)

4194 existing lines in 35 files now uncovered.

17388 of 84365 relevant lines covered (20.61%)

0.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/edu/harvard/iq/dataverse/rserve/RemoteDataFrameService.java
1
/*
2
   Copyright (C) 2005-2012, by the President and Fellows of Harvard College.
3

4
   Licensed under the Apache License, Version 2.0 (the "License");
5
   you may not use this file except in compliance with the License.
6
   You may obtain a copy of the License at
7

8
         http://www.apache.org/licenses/LICENSE-2.0
9

10
   Unless required by applicable law or agreed to in writing, software
11
   distributed under the License is distributed on an "AS IS" BASIS,
12
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
   See the License for the specific language governing permissions and
14
   limitations under the License.
15

16
   Dataverse Network - A web application to share, preserve and analyze research data.
17
   Developed at the Institute for Quantitative Social Science, Harvard University.
18
   Version 3.0.
19
*/
20
package edu.harvard.iq.dataverse.rserve;
21

22
import edu.harvard.iq.dataverse.DataFile;
23
import edu.harvard.iq.dataverse.dataaccess.DataAccess;
24
import edu.harvard.iq.dataverse.dataaccess.StorageIO;
25
import edu.harvard.iq.dataverse.dataaccess.DataAccessRequest;
26

27
import java.io.BufferedInputStream;
28
import java.io.BufferedOutputStream;
29
import java.io.File;
30
import java.io.FileInputStream;
31
import java.io.FileNotFoundException;
32
import java.io.FileOutputStream;
33
import java.io.IOException;
34
import java.io.InputStream;
35
import java.io.OutputStream;
36
import java.util.ArrayList;
37
import java.util.Arrays;
38
import java.util.HashMap;
39
import java.util.List;
40
import java.util.Map;
41
import java.util.Set;
42
import java.util.logging.Logger;
43

44
import edu.harvard.iq.dataverse.settings.JvmSettings;
45
import org.apache.commons.io.IOUtils;
46

47
import org.apache.commons.lang3.RandomStringUtils;
48
import org.apache.commons.lang3.StringUtils;
49
import org.rosuda.REngine.*;
50
import org.rosuda.REngine.Rserve.*;
51

52
/**
53
 * 
54
 * @author Leonid Andreev
55
 * (the name is still tentative!)
56
 * parts of the code are borrowed from Akio Sone's DvnRforeignFileConversionServiceImpl,
57
 * developed for DVN v.2.*
58
 * 
59
 * original author:
60
 * @author Akio Sone
61
 */
62

63
public class RemoteDataFrameService {
64

65
    // ----------------------------------------------------- static filelds
66
    
UNCOV
67
    private static Logger logger = Logger.getLogger(RemoteDataFrameService.class.getPackage().getName());
×
68

69

UNCOV
70
    private static String TMP_DATA_FILE_NAME = "dataverseTabData_";
×
71
    private static String RWRKSP_FILE_PREFIX = "dataverseDataFrame_";
×
72
    private static String PREPROCESS_FILE_PREFIX = "dataversePreprocess_";
×
73

UNCOV
74
    private static String TMP_TABDATA_FILE_EXT = ".tab";
×
75
    private static String TMP_RDATA_FILE_EXT = ".RData";
×
76
    
77
    // These settings have sane defaults in resources/META-INF/microprofile-config.properties,
78
    // ready to be overridden by a sysadmin
79
    private final String RSERVE_HOST;
80
    private final String RSERVE_USER;
81
    private final String RSERVE_PWD;
82
    private final int    RSERVE_PORT;
83
    private final String RSERVE_TMP_DIR;
84
        
UNCOV
85
    private static String DATAVERSE_R_FUNCTIONS = "scripts/dataverse_r_functions.R";
×
86
    private static String DATAVERSE_R_PREPROCESSING = "scripts/preprocess.R";
×
87
    
UNCOV
88
    public String PID = null;
×
89
    public String tempFileNameIn = null;
×
90
    public String tempFileNameOut = null;
×
91

UNCOV
92
    public RemoteDataFrameService() {
×
93
        // These settings have sane defaults in resources/META-INF/microprofile-config.properties,
94
        // ready to be overridden by a sysadmin. Config sources have their own caches, so adding
95
        // these here means the setting can be changed dynamically without too much overhead.
UNCOV
96
        this.RSERVE_HOST = JvmSettings.RSERVE_HOST.lookup();
×
97
        this.RSERVE_USER = JvmSettings.RSERVE_USER.lookup();
×
98
        this.RSERVE_PWD = JvmSettings.RSERVE_PASSWORD.lookup();
×
99
        this.RSERVE_PORT = JvmSettings.RSERVE_PORT.lookup(Integer.class);
×
100
        this.RSERVE_TMP_DIR = JvmSettings.RSERVE_TEMPDIR.lookup();
×
101
        
102
        
103
        // initialization
UNCOV
104
        PID = RandomStringUtils.randomNumeric(6);
×
105

UNCOV
106
        tempFileNameIn = RSERVE_TMP_DIR + "/" + TMP_DATA_FILE_NAME
×
107
                + "." + PID + TMP_TABDATA_FILE_EXT;
108

UNCOV
109
        tempFileNameOut = RSERVE_TMP_DIR + "/" + RWRKSP_FILE_PREFIX
×
110
                + "." + PID + TMP_RDATA_FILE_EXT;
111

UNCOV
112
        logger.fine("tempFileNameIn=" + tempFileNameIn);
×
113
        logger.fine("tempFileNameOut=" + tempFileNameOut);
×
114

UNCOV
115
    }
×
116
    
117
    public Map<String, String> directConvert(File originalFile, String fmt){
118
        
UNCOV
119
        Map<String, String> result = new HashMap<>();
×
120
        try {
UNCOV
121
            RConnection connection = setupConnection();
×
122
            // send the data file to the Rserve side:
UNCOV
123
            InputStream inFile = new BufferedInputStream(new FileInputStream(originalFile));
×
124

UNCOV
125
            RFileOutputStream rOutFile = connection.createFile(tempFileNameIn);
×
126
            copyWithBuffer(inFile, rOutFile, 1024);
×
127
                        
128
            // We need to initialize our R session:
129
            // send custom R code library over to the Rserve and load the code:
UNCOV
130
            String rscript = readLocalResource(DATAVERSE_R_FUNCTIONS);
×
131
            connection.voidEval(rscript);
×
132
            
UNCOV
133
            String dataFileName = "Data." + PID + ".RData";
×
134
            
135
            // data file to be copied back to the dvn
UNCOV
136
            String dsnprfx = RSERVE_TMP_DIR + "/" + dataFileName;
×
137
            
UNCOV
138
            String command = "direct_export(file='"+tempFileNameIn+"'," +
×
139
                             "fmt='" + fmt + "'" + ", dsnprfx='" + dsnprfx + "')";
140
                        
UNCOV
141
            connection.voidEval(command);
×
142
            
UNCOV
143
            int wbFileSize = getFileSize(connection, dsnprfx);
×
144
            File localDataFrameFile = transferRemoteFile(connection, dsnprfx, RWRKSP_FILE_PREFIX,"RData", wbFileSize);
×
145
            
UNCOV
146
            if (localDataFrameFile != null){
×
147
                logger.fine("data frame file name: "+localDataFrameFile.getAbsolutePath());
×
148
                result.put("dataFrameFileName",localDataFrameFile.getAbsolutePath());
×
149
            } else {
UNCOV
150
                logger.fine("data frame file is null!");
×
151
                // throw an exception??
152
            }
153
            
UNCOV
154
            result.put("Rversion", connection.eval("R.Version()$version.string").asString());
×
155
            
UNCOV
156
            logger.fine("result object (before closing the Rserve):\n"+result);
×
157
            
UNCOV
158
            String deleteLine = "file.remove('"+tempFileNameIn+"')";
×
159
            connection.eval(deleteLine);
×
160
 
UNCOV
161
            connection.close();
×
162
        
UNCOV
163
        } catch (IOException | REXPMismatchException | RserveException e) {
×
164
            logger.severe(e.getMessage());
×
165
            result.put("RexecError", "true");
×
166
        }
×
167
        
UNCOV
168
        return result;
×
169
    }
170
    
171
    /*
172
     * Execute a data frame creation process:
173
     * 
174
     * (TODO: describe the process here; -- L.A. 4.0 alpha 1)
175
     *
176
     * @param sro    a RJobRequest object that contains various parameters
177
     * @return    a Map that contains various information about results
178
    
179
     * TODO: replace this Map with a dedicated RJobResult object; -- L.A. 4.0 alpha 1
180
     */    
181
    
182
    public Map<String, String> execute(RJobRequest jobRequest) {
UNCOV
183
        logger.fine("RemoteDataFrameService: execute() starts here.");
×
184
    
UNCOV
185
        Map<String, String> result = new HashMap<>();
×
186
        
187
        try {
UNCOV
188
            RConnection connection = setupConnection();
×
189
            // send the data file to the Rserve side:
UNCOV
190
            InputStream inFile = new BufferedInputStream(new FileInputStream(
×
191
                                     jobRequest.getTabularDataFileName()));
×
192

UNCOV
193
            RFileOutputStream rOutFile = connection.createFile(tempFileNameIn);
×
194
            copyWithBuffer(inFile, rOutFile, 1024);
×
195
            
196
            // Rserve code starts here
UNCOV
197
            logger.fine("wrkdir="+RSERVE_TMP_DIR);
×
198
                        
199
            // We need to initialize our R session:
200
            // send custom R code library over to the Rserve and load the code:
UNCOV
201
            String rscript = readLocalResource(DATAVERSE_R_FUNCTIONS);
×
202
            connection.voidEval(rscript);
×
203
            logger.fine("raw variable type="+Arrays.toString(jobRequest.getVariableTypes()));
×
204
            connection.assign("vartyp", new REXPInteger(jobRequest.getVariableTypes()));
×
205
        
206
            // variable *formats* - not to be confused with variable *types*!
207
            // these specify extra, optional format specifications - for example, 
208
            // String variables may represent date and time values. 
209
            
UNCOV
210
            Map<String, String> varFormat = jobRequest.getVariableFormats();
×
211
            
UNCOV
212
            logger.fine("tmpFmt="+varFormat);
×
213
            
214
            // In the fragment below we create an R list varFrmt storing 
215
            // these format specifications: 
216
            
UNCOV
217
            if (varFormat != null){
×
218
                String[] formatKeys = varFormat.keySet().toArray(new String[varFormat.size()]);
×
219
                String[] formatValues = getValueSet(varFormat, formatKeys);
×
220
                connection.assign("tmpfk", new REXPString(formatKeys));
×
221
                connection.assign("tmpfv", new REXPString(formatValues));
×
222
                String fmtNamesLine = "names(tmpfv)<- tmpfk";
×
223
                connection.voidEval(fmtNamesLine);
×
224
                String fmtValuesLine ="varFmt<- as.list(tmpfv)";
×
225
                connection.voidEval(fmtValuesLine);
×
226
            } else {
×
227
                connection.assign("varFmt", new REXPList(new RList(new ArrayList<>(),
×
228
                                                                   new String[]{})));
229
            }
230
            
231
            // Variable names:
UNCOV
232
            String [] jvnamesRaw = jobRequest.getVariableNames();
×
233
            String [] jvnames;
234
            
UNCOV
235
            if (jobRequest.hasUnsafeVariableNames){
×
236
                // create  list
UNCOV
237
                jvnames =  jobRequest.safeVarNames;
×
238
                logger.fine("renamed="+StringUtils.join(jvnames,","));
×
239
            } else {
UNCOV
240
                jvnames = jvnamesRaw;
×
241
            }
242
            
UNCOV
243
            connection.assign("vnames", new REXPString(jvnames));
×
244
            
245
            // confirm:
246
            
UNCOV
247
            String [] tmpjvnames = connection.eval("vnames").asStrings();
×
248
            logger.fine("vnames:"+ StringUtils.join(tmpjvnames, ","));
×
249
            
250
           
251
            // read.dataverseTabData method, from dataverse_r_functions.R, 
252
            // uses R's standard scan() function to read the tabular data we've 
253
            // just transfered over and turn it into a dataframe. It adds some 
254
            // custom post-processing too - restores missing values, converts 
255
            // strings representing dates and times into R date and time objects, 
256
            // and more. 
257
            
258
            // Parameters for the read.dataverseTabData method executed on the R side:
259
            
260
            // file -> tempFileName
261
            // col.names -> Arrays.deepToString(new REXPString(jvnames)).asStrings())
262
            // colClassesx -> Arrays.deepToString((new REXPInteger(sro.getVariableTypes())).asStrings())
263
            // varFormat -> Arrays.deepToString((new REXPString(getValueSet(tmpFmt, tmpFmt.keySet().toArray(new String[tmpFmt.keySet().size()])))).asStrings())
264

UNCOV
265
            logger.fine("read.dataverseTabData parameters:");
×
266
            logger.fine("col.names = " + Arrays.deepToString((new REXPString(jvnames)).asStrings()));
×
267
            logger.fine("colClassesx = " + Arrays.deepToString((new REXPInteger(jobRequest.getVariableTypes())).asStrings()));
×
268
            logger.fine("varFormat = " + Arrays.deepToString((new REXPString(getValueSet(varFormat, varFormat.keySet().toArray(new String[varFormat.keySet().size()])))).asStrings()));
×
269
            
UNCOV
270
            String readtableline = "x<-read.dataverseTabData(file='"+tempFileNameIn+
×
271
                "', col.names=vnames, colClassesx=vartyp, varFormat=varFmt )";
UNCOV
272
            logger.fine("readtable="+readtableline);
×
273

UNCOV
274
            connection.voidEval(readtableline);
×
275
        
UNCOV
276
            if (jobRequest.hasUnsafeVariableNames){
×
277
                logger.fine("unsafeVariableNames exist");
×
278
                jvnames = jobRequest.safeVarNames;
×
279
                String[] rawNameSet  = jobRequest.renamedVariableArray;
×
280
                String[] safeNameSet = jobRequest.renamedResultArray;
×
281
                
UNCOV
282
                connection.assign("tmpRN", new REXPString(rawNameSet));
×
283
                connection.assign("tmpSN", new REXPString(safeNameSet));
×
284
                
UNCOV
285
                String raw2safevarNameTableLine = "names(tmpRN)<- tmpSN";
×
286
                connection.voidEval(raw2safevarNameTableLine);
×
287
                String attrRsafe2rawLine = "attr(x, 'Rsafe2raw')<- as.list(tmpRN)";
×
288
                connection.voidEval(attrRsafe2rawLine);
×
289
            } else {
×
290
                String attrRsafe2rawLine = "attr(x, 'Rsafe2raw')<-list();";
×
291
                connection.voidEval(attrRsafe2rawLine);
×
292
            }
293
            
294
            // Restore NAs (missign values) in the data frame:
295
            // (these are encoded as empty strings in dataverse tab files)
296
            // Why are we doing it here? And not in the dataverse_r_functions.R 
297
            // fragment? 
298
            
UNCOV
299
            String asIsline  = "for (i in 1:dim(x)[2]){ "+
×
300
                "if (attr(x,'var.type')[i] == 0) {" +
301
                "x[[i]]<-I(x[[i]]);  x[[i]][ x[[i]] == '' ]<-NA  }}";
UNCOV
302
            connection.voidEval(asIsline);
×
303
            
UNCOV
304
            String[] varLabels = jobRequest.getVariableLabels();
×
305
             
UNCOV
306
            connection.assign("varlabels", new REXPString(varLabels));
×
307
            
UNCOV
308
            String attrVarLabelsLine = "attr(x, 'var.labels')<-varlabels";
×
309
            connection.voidEval(attrVarLabelsLine);
×
310
            
311
            // Confirm:
UNCOV
312
            String [] vlbl = connection.eval("attr(x, 'var.labels')").asStrings();
×
313
            logger.fine("varlabels="+StringUtils.join(vlbl, ","));
×
314
        
315
            // create the VALTABLE and VALORDER lists:
UNCOV
316
            connection.voidEval("VALTABLE<-list()");
×
317
            connection.voidEval("VALORDER<-list()");
×
318

319
            // In the fragment below, we'll populate the VALTABLE list that we've
320
            // just created with the actual values and labels of our categorical varaibles.
321
            // TODO: 
322
            // This code has been imported from the DVN v2-3
323
            // implementation. I keep wondering if there is a simpler way to
324
            // achive this - to pass these maps of values and labels to R 
325
            // in fewer steps/with less code - ?
326
            // -- L.A. 4.3
327
            
UNCOV
328
            Map<String, Map<String, String>> valueTable = jobRequest.getValueTable();
×
329
            Map<String, List<String>> orderedCategoryValues = jobRequest.getCategoryValueOrders();
×
330
            String[] variableIds = jobRequest.getVariableIds();
×
331

UNCOV
332
            for (int j = 0; j < variableIds.length; j++) {
×
333
                // if this variable has a value-label table,
334
                // pass its key and value arrays to Rserve;
335
                // finalize a value-table on the Rserve side:
336

UNCOV
337
                String varId = variableIds[j];
×
338

UNCOV
339
                if (valueTable.containsKey(varId)) {
×
340

UNCOV
341
                    Map<String, String> tmp = valueTable.get(varId);
×
342
                    Set<String> variableKeys = tmp.keySet();
×
343
                    String[] tmpk = variableKeys.toArray(new String[variableKeys.size()]);
×
344
                    String[] tmpv = getValueSet(tmp, tmpk);
×
345

UNCOV
346
                    logger.fine("tmp:k=" + StringUtils.join(tmpk, ","));
×
347
                    logger.fine("tmp:v=" + StringUtils.join(tmpv, ","));
×
348

UNCOV
349
                    if (tmpv.length > 0) {
×
350
                        connection.assign("tmpk", new REXPString(tmpk));
×
351
                        connection.assign("tmpv", new REXPString(tmpv));
×
352

UNCOV
353
                        String namesValueLine = "names(tmpv)<- tmpk";
×
354
                        connection.voidEval(namesValueLine);
×
355

356
                        // index number starts from 1(not 0):
UNCOV
357
                        String sbvl = "VALTABLE[['" + (j + 1) + "']]" + "<- as.list(tmpv)";
×
358
                        logger.fine("frag=" + sbvl);
×
359
                        connection.voidEval(sbvl);
×
360

361
                        // confirmation test for j-th variable name
UNCOV
362
                        REXP jl = connection.parseAndEval(sbvl);
×
363
                        logger.fine("jl(" + j + ") = " + jl);
×
364
                    }
365
                }
366
                
367
                // If this is an ordered categorical value (and that means,
368
                // it was produced from an ordered factor, from an ingested 
369
                // R data frame, since no other formats we support have 
370
                // ordered categoricals), we'll also supply a list of these
371
                // ordered values:
372
                
373
                
UNCOV
374
                if (orderedCategoryValues != null && orderedCategoryValues.containsKey(varId)) {
×
375
                    List<String> orderList = orderedCategoryValues.get(varId);
×
376
                    if (orderList != null) {
×
377
                        String[] ordv = (String[]) orderList.toArray(new String[orderList.size()]);
×
378
                        logger.fine("ordv="+ StringUtils.join(ordv,","));
×
379
                        connection.assign("ordv", new REXPString(ordv));
×
380
                        String sbvl = "VALORDER[['"+ Integer.toString(j + 1)+"']]" + "<- as.list(ordv)";
×
381
                        logger.fine("VALORDER[...]="+sbvl);
×
382
                        connection.voidEval(sbvl);
×
383
                    } else {
×
384
                        logger.fine("NULL orderedCategoryValues list.");
×
385
                    }
386
                }
387
            }
388

389
            // And now we store the VALTABLE and MSVLTBL as attributes of the 
390
            // dataframe we are cooking:
UNCOV
391
            logger.fine("length of vl=" + connection.eval("length(VALTABLE)").asInteger());
×
392
            String attrValTableLine = "attr(x, 'val.table')<-VALTABLE";
×
393
            connection.voidEval(attrValTableLine);
×
394
 
UNCOV
395
            String msvStartLine = "MSVLTBL<-list();";
×
396
            connection.voidEval(msvStartLine);
×
397
            String attrMissvalLine = "attr(x, 'missval.table')<-MSVLTBL";
×
398
            connection.voidEval(attrMissvalLine);
×
399
            
400
            // But we are not done, with these value label maps... We now need
401
            // to call these methods from the dataverse_r_functions.R script
402
            // to further process the lists. Among other things, they will 
403
            // create these new lists - value index and missing value index, that 
404
            // simply indicate which variables have any of the above; these will 
405
            // also be saved as attributes of the data frame, val.index and 
406
            // missval.index respectively. But, also, the methods will reprocess
407
            // and overwite the val.table and missval.table attributes already stored in 
408
            // the dataframe. I don't fully understand why that is necessary, or what it is
409
            // that we are actually adding to the lists there... Another TODO: ? 
410
            
411
            
UNCOV
412
            String createVIndexLine = "x<-createvalindex(dtfrm=x, attrname='val.index');";
×
413
            connection.voidEval(createVIndexLine);
×
414
            String createMVIndexLine = "x<-createvalindex(dtfrm=x, attrname='missval.index');";
×
415
            connection.voidEval(createMVIndexLine);
×
416

417
           
418
            // And now we'll call the last method from the R script - createDataverseDataFrame();
419
            // It should probably be renamed. The dataframe has already been created. 
420
            // what this method does, it goes through the frame, and changes the 
421
            // vectors representing categorical variables to R factors. 
422
            // For example, if this tabular file was produced from a Stata file 
423
            // that had a categorical in which "Male" and "Female" were represented 
424
            // with 0 and 1. In the Dataverse datbase, the string values "Male" and 
425
            // "Female" are now stored as "categorical value labels". And the column 
426
            // in the tab file has numeric 1 and 0s. That's what the R
427
            // dataframe was created from, so it now has a numeric vector of 1s and 0s
428
            // representing this variable. So in this step we are going 
429
            // to change this vector into a factor, using the labels and values 
430
            // that we already passed over via Rserve and stored in the val.table, above. 
431
            
432
            // TODO: 
433
            // I'm going to propose that we go back to what we used to do back in 
434
            // DVN 2-3.* - instead of giving the user a single dataframe (.RData) 
435
            // file, provide a zip file, with the data frame, and also a README 
436
            // file with some documentation explaining how the data frame was 
437
            // created, and pointing out some potential issues stemming from the 
438
            // conversion between formats. Converting Stata categoricals into 
439
            // R factors is one of such issues (if nothing else, do note that 
440
            // the UNF of the datafile with the column described in the example 
441
            // above will change, if the resulting R dataframe is reingested! See 
442
            // the UNF documentation for more info...). We may also make this 
443
            // download interactive - giving the user some options for how 
444
            // to handle the conversion (so, another choice would be to convert 
445
            // the above to a factor of "0" and "1"s), etc. 
446
            // -- L.A. 4.3
447
                            
UNCOV
448
            String dataFileName = "Data." + PID + "." + jobRequest.getFormatRequested();
×
449
            
450
            // data file to be copied back to the dvn
UNCOV
451
            String dsnprfx = RSERVE_TMP_DIR + "/" + dataFileName;
×
452
            
UNCOV
453
            String dataverseDataFrameCommand = "createDataverseDataFrame(dtfrm=x,"+
×
454
                "dwnldoptn='"+jobRequest.getFormatRequested()+"'"+
×
455
                ", dsnprfx='"+dsnprfx+"')";
456
                        
UNCOV
457
            connection.voidEval(dataverseDataFrameCommand);
×
458
            
UNCOV
459
            int wbFileSize = getFileSize(connection,dsnprfx);
×
460
            
UNCOV
461
            logger.fine("wbFileSize="+wbFileSize);
×
462
            
UNCOV
463
            result.putAll(buildResult(connection, dsnprfx, wbFileSize, result));
×
464
        } catch (Exception e) {
×
465
            logger.severe(e.getMessage());
×
466
            result.put("RexecError", "true");
×
467
        }
×
468
        
UNCOV
469
        return result;
×
470
        
471
    }
472

473
    private Map<String, String> buildResult(RConnection connection, String dsnprfx, int wbFileSize, Map<String, String> result) throws RserveException, REXPMismatchException {
474
        // If the above succeeded, the dataframe has been saved on the
475
        // Rserve side as an .Rdata file. Now we can transfer it back to the
476
        // dataverse side:
UNCOV
477
        File localDataFrameFile = transferRemoteFile(connection, dsnprfx, RWRKSP_FILE_PREFIX,"RData", wbFileSize);
×
478
        
UNCOV
479
        if (localDataFrameFile != null){
×
480
            logger.fine("data frame file name: "+localDataFrameFile.getAbsolutePath());
×
481
            result.put("dataFrameFileName",localDataFrameFile.getAbsolutePath());
×
482
        } else {
UNCOV
483
            logger.warning("data frame file is null!");
×
484
            // throw an exception??
485
        }
486
        
UNCOV
487
        result.put("Rversion", connection.eval("R.Version()$version.string").asString());
×
488
        
UNCOV
489
        logger.fine("result object (before closing the Rserve):\n"+result);
×
490
        
UNCOV
491
        String deleteLine = "file.remove('"+tempFileNameIn+"')";
×
492
        connection.eval(deleteLine);
×
493
        connection.close();
×
494
        return result;
×
495
    }
496

497
    private RConnection setupConnection() throws REXPMismatchException, RserveException {
498
        // Set up an Rserve connection
UNCOV
499
        logger.fine("RSERVE_USER="+RSERVE_USER+"[default=rserve]");
×
500
        logger.fine("RSERVE_PASSWORD="+RSERVE_PWD+"[default=rserve]");
×
501
        logger.fine("RSERVE_PORT="+RSERVE_PORT+"[default=6311]");
×
502
        logger.fine("RSERVE_HOST="+RSERVE_HOST);
×
503
        RConnection connection = new RConnection(RSERVE_HOST, RSERVE_PORT);
×
504
        connection.login(RSERVE_USER, RSERVE_PWD);
×
505
        logger.fine(">" + connection.eval("R.version$version.string").asString() + "<");
×
506
        // check working directories
507
        // This needs to be done *before* we try to create any files
508
        // there!
UNCOV
509
        setupWorkingDirectory(connection);
×
510
        return connection;
×
511
    }
512
    
513
    public void setupWorkingDirectory(RConnection connection) {
514
        
515
        try {
516
            // check the temp directory; try to create it if it doesn't exist:
517

UNCOV
518
            String checkWrkDir = "if (!file_test('-d', '" + RSERVE_TMP_DIR + "')) {dir.create('" + RSERVE_TMP_DIR + "', showWarnings = FALSE, recursive = TRUE);}";
×
519

UNCOV
520
            logger.fine("w permission=" + checkWrkDir);
×
521
            connection.voidEval(checkWrkDir);
×
522

UNCOV
523
        } catch (RserveException rse) {
×
524
            rse.printStackTrace();
×
525
        }
×
526
    }
×
527
    
528

529
    public File runDataPreprocessing(DataFile dataFile) {
UNCOV
530
        if (!dataFile.isTabularData()) {
×
531
            return null;
×
532
        }
533

UNCOV
534
        File preprocessedDataFile = null; 
×
535
        
536
        try {
537
            
538
            // Set up an Rserve connection
539
            
UNCOV
540
            RConnection connection = new RConnection(RSERVE_HOST, RSERVE_PORT);
×
541

UNCOV
542
            connection.login(RSERVE_USER, RSERVE_PWD);            
×
543
            // check working directories
544
            // This needs to be done *before* we try to create any files 
545
            // there!
UNCOV
546
            setupWorkingDirectory(connection);
×
547
            
548
            // send the tabular data file to the Rserve side:
549
            
UNCOV
550
            StorageIO<DataFile> accessObject = DataAccess.getStorageIO(dataFile,
×
551
                                                        new DataAccessRequest());
552
            
UNCOV
553
            if (accessObject == null) {
×
554
                return null; 
×
555
            }
556
            
UNCOV
557
            accessObject.open();
×
558
            InputStream is = accessObject.getInputStream();
×
559
            if (is == null) {
×
560
                return null; 
×
561
            }
562
                    
563
            // Create the output stream on the remote, R end: 
564
            
UNCOV
565
            RFileOutputStream rOutStream = connection.createFile(tempFileNameIn);   
×
566
            
567

568
            // before writing out any bytes from the input stream, flush
569
            // any extra content, such as the variable header for the 
570
            // subsettable files:
UNCOV
571
            if (accessObject.getVarHeader() != null) {
×
572
                rOutStream.write(accessObject.getVarHeader().getBytes());
×
573
            }
574

UNCOV
575
            copyWithBuffer(is, rOutStream, 4*8192); 
×
576
            
577
            // Rserve code starts here
UNCOV
578
            logger.fine("wrkdir="+RSERVE_TMP_DIR);
×
579
            
580
            // Locate the R code and run it on the temp file we've just 
581
            // created: 
582
            
UNCOV
583
            connection.voidEval("library(rjson)");
×
584
            String rscript = readLocalResource(DATAVERSE_R_PREPROCESSING);
×
585
            logger.fine("preprocessing R code: "+rscript.substring(0,64));
×
586
            connection.voidEval(rscript);
×
587
            
UNCOV
588
            String runPreprocessing = "json<-preprocess(filename=\""+ tempFileNameIn +"\")";
×
589
            logger.fine("data preprocessing command: "+runPreprocessing);
×
590
            connection.voidEval(runPreprocessing);
×
591
                        
592
            // Save the output in a temp file: 
593
            
UNCOV
594
            String saveResult = "write(json, file='"+ tempFileNameOut +"')";
×
595
            logger.fine("data preprocessing save command: "+saveResult);
×
596
            connection.voidEval(saveResult);
×
597
            
598
            // Finally, transfer the saved file back on the application side:
599
            
UNCOV
600
            int fileSize = getFileSize(connection,tempFileNameOut);
×
601
            preprocessedDataFile = transferRemoteFile(connection, tempFileNameOut, PREPROCESS_FILE_PREFIX, "json", fileSize);
×
602
            
UNCOV
603
            String deleteLine = "file.remove('"+tempFileNameOut+"')";
×
604
            connection.eval(deleteLine);
×
605
            
UNCOV
606
            connection.close();
×
607
        } catch (Exception ex){
×
608
            ex.printStackTrace();
×
609
            return null ;
×
610
        }
×
611

612
            
UNCOV
613
        return preprocessedDataFile;
×
614
    }
615

616
    private void copyWithBuffer(InputStream is, RFileOutputStream rOutStream, int bufSize) throws IOException {
UNCOV
617
        byte[] buffer = new byte[bufSize];
×
618
       bufSize = is.read(buffer);
×
619
        while (bufSize != -1) {
×
620
            rOutStream.write(buffer, 0, bufSize);
×
621
            bufSize = is.read(buffer);
×
622
        }
623
        
UNCOV
624
        is.close();
×
625
        rOutStream.close();
×
626
    }
×
627
    
628
    // utilitiy methods:
629
    
630
    /**
631
     * Returns the array of map values, that corresponds to the order of 
632
     * the keys provided in the keys array.
633
     * 
634
     */
635
    public static String[] getValueSet(Map<String, String> map, String[] keys) {
UNCOV
636
        String[] result = new String[keys.length];
×
637
        for (int i = 0; i<keys.length; i++){
×
638
            result[i] = map.get(keys[i]);
×
639
        }
UNCOV
640
        return result;
×
641
    }
642
    
643
    
644
    /*
645
     * the method that does the actual data frame request:
646
     * (TODO: may not need to be a separate method -- something for the final cleanup ?
647
     * -- L.A. 4.0 alpha 1)
648
     */
649
    public Map<String, String> runDataFrameRequest(RJobRequest jobRequest, RConnection connection){
650
            
UNCOV
651
        Map<String, String> sr = new HashMap<>();
×
652
                
653
        try {
UNCOV
654
            String dataFileName = "Data." + PID + "." + jobRequest.getFormatRequested();
×
655
            
656
            // data file to be copied back to the dvn
UNCOV
657
            String dsnprfx = RSERVE_TMP_DIR + "/" + dataFileName;
×
658
            
UNCOV
659
            String dataverseDataFrameCommand = "createDataverseDataFrame(dtfrm=x,"+
×
660
                "dwnldoptn='"+jobRequest.getFormatRequested()+"'"+
×
661
                ", dsnprfx='"+dsnprfx+"')";
662
                        
UNCOV
663
            connection.voidEval(dataverseDataFrameCommand);
×
664
            
UNCOV
665
            int wbFileSize = getFileSize(connection,dsnprfx);
×
666
            
UNCOV
667
            logger.fine("wbFileSize="+wbFileSize);
×
668
            
UNCOV
669
        } catch (RserveException rse) {
×
670
            rse.printStackTrace();
×
671
            sr.put("RexecError", "true");
×
672
            return sr;
×
673
        }
×
674

UNCOV
675
        sr.put("RexecError", "false");
×
676
        return sr;
×
677
    }
678
        
679
    
680
    public File transferRemoteFile(RConnection connection, String targetFilename,
681
            String tmpFilePrefix, String tmpFileExt, int fileSize) {
682

683
        // set up a local temp file:
UNCOV
684
        File tmpResultFile = null;
×
685
        RFileInputStream rInStream = null;
×
686
        OutputStream outbr = null;
×
687
        try {
UNCOV
688
            tmpResultFile = File.createTempFile(tmpFilePrefix + PID, "."+tmpFileExt);
×
689
            outbr = new BufferedOutputStream(new FileOutputStream(tmpResultFile));
×
690
            // open the input stream
UNCOV
691
            rInStream = connection.openFile(targetFilename);
×
692
            if (fileSize < 1024 * 1024 * 500) {
×
693
                byte[] obuf = new byte[fileSize];
×
694
                rInStream.read(obuf);
×
695
                outbr.write(obuf, 0, fileSize);
×
696
            }
UNCOV
697
            rInStream.close();
×
698
            outbr.close();
×
699
            return tmpResultFile;
×
700
        } catch (FileNotFoundException fe) {
×
701
            fe.printStackTrace();
×
702
            logger.fine("FileNotFound exception occurred");
×
703
            return tmpResultFile;
×
704
        } catch (IOException ie) {
×
705
            ie.printStackTrace();
×
706
            logger.fine("IO exception occurred");
×
707
        } finally {
UNCOV
708
            if (rInStream != null) {
×
709
                try {
UNCOV
710
                    rInStream.close();
×
711
                } catch (IOException e) {
×
712

UNCOV
713
                }
×
714
            }
715

UNCOV
716
            if (outbr != null) {
×
717
                try {
UNCOV
718
                    outbr.close();
×
719
                } catch (IOException e) {
×
720

UNCOV
721
                }
×
722
            }
723

724
        }
725
        
726
        // delete remote file: 
727
        
728
        try {
UNCOV
729
            String deleteLine = "file.remove('"+targetFilename+"')";
×
730
            connection.eval(deleteLine);
×
731
        } catch (Exception ex) {
×
732
            // do nothing.
UNCOV
733
        }
×
734
        
UNCOV
735
        return tmpResultFile;
×
736
    }
737
    
738
   
739
    public int getFileSize(RConnection connection, String targetFilename){
UNCOV
740
        logger.fine("targetFilename="+targetFilename);
×
741
        int fileSize = 0;
×
742
        try {
UNCOV
743
            String fileSizeLine = "round(file.info('"+targetFilename+"')$size)";
×
744
            fileSize = connection.eval(fileSizeLine).asInteger();
×
745
        } catch (RserveException | REXPMismatchException ex) {
×
746
            ex.printStackTrace();
×
747
        }
×
748
        return fileSize;
×
749
    }
750
    
751
    private static String readLocalResource(String path) {
752
        
UNCOV
753
        logger.fine(String.format("Data Frame Service: readLocalResource: reading local path \"%s\"", path));
×
754

755
        // Get stream
UNCOV
756
        InputStream resourceStream = RemoteDataFrameService.class.getResourceAsStream(path);
×
757
        String resourceAsString = "";
×
758

759
        // Try opening a buffered reader stream
760
        try {
UNCOV
761
            resourceAsString = IOUtils.toString(resourceStream, "UTF-8");
×
762
            resourceStream.close();
×
763
        } catch (IOException ex) {
×
764
            logger.warning(String.format("RDATAFileReader: (readLocalResource) resource stream from path \"%s\" was invalid", path));
×
765
        }
×
766
        return resourceAsString;
×
767
    }
768
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc