-
Notifications
You must be signed in to change notification settings - Fork 120
/
PickUpPlace.java
executable file
·759 lines (667 loc) · 28.3 KB
/
PickUpPlace.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
package emissary.pickup;
import emissary.core.DataObjectFactory;
import emissary.core.EmissaryException;
import emissary.core.Form;
import emissary.core.IBaseDataObject;
import emissary.core.IMobileAgent;
import emissary.core.NamespaceException;
import emissary.core.channels.InMemoryChannelFactory;
import emissary.log.MDCConstants;
import emissary.parser.ParserEOFException;
import emissary.parser.ParserException;
import emissary.parser.ParserFactory;
import emissary.parser.SessionParser;
import emissary.parser.SessionProducer;
import emissary.place.AgentsNotSupportedPlace;
import emissary.place.IServiceProviderPlace;
import emissary.place.ServiceProviderPlace;
import emissary.pool.AgentPool;
import emissary.spi.ObjectTracing;
import emissary.spi.ObjectTracingService;
import emissary.util.ClassComparator;
import emissary.util.TimeUtil;
import emissary.util.shell.Executrix;
import org.slf4j.MDC;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import static emissary.core.constants.Parameters.FILE_DATE;
import static emissary.core.constants.Parameters.FILE_NAME;
/**
* This class is the base class of those places that inject data into the system. This place knows a lot about
* processing files of input, including files of sessions that must be identified and parsed using a ParserFactory. It
* knows nothing about where the data comes from, though. The method of input, either a directory or set of directories
* to monitor, a socket, a WorkSpace provider, or something else, comes from classes that extend this one.
*/
public abstract class PickUpPlace extends ServiceProviderPlace implements IPickUpPlace, AgentsNotSupportedPlace {
// Any data picked up with less/more bytes than this will be
// set to ERROR initially, can be overridden in config files
// for pickup places.
protected int minimumContentLength = 10;
protected long maximumContentLength = 1048567;
protected String oversizeArea = "OversizeData";
// Directory store original data while processing
protected String holdingArea;
// Where to move data that has an error
protected String errorArea;
// Where to move data when done
@Nullable
protected String doneArea;
// Our parser factory
protected ParserFactory parserFactory = new ParserFactory();
// True turns off data identification engine to just favor
// the simple parsers
protected boolean simpleMode = false;
// Reference to global agent pool for out payloads
protected AgentPool agentPool;
// Initial forms for new data, read from config file
protected List<String> initialFormValues = Collections.emptyList();
// Metadata items that should always be copied to children
protected Set<String> ALWAYS_COPY_METADATA_VALS = new HashSet<>();
protected boolean useObjectTraceLogger = false;
public PickUpPlace() throws IOException {
super();
configurePickUpPlace();
}
public PickUpPlace(InputStream configStream) throws IOException {
super(configStream);
configurePickUpPlace();
}
/**
* Create a pick up place
*
* @param configInfo the config location
* @param placeLocation the place key
* @throws IOException If there is some I/O problem.
*/
public PickUpPlace(String configInfo, String placeLocation) throws IOException {
this(configInfo, null, placeLocation);
}
/**
* Create a pick up place
*
* @param configInfo the config location
* @param dir the key of the controlling directory
* @param placeLoc the place key
* @throws IOException If there is some I/O problem.
*/
public PickUpPlace(String configInfo, @Nullable String dir, String placeLoc) throws IOException {
super(configInfo, dir, placeLoc);
configurePickUpPlace();
}
/**
* Create a pick up place
*
* @param configStream the config stream
* @param dir the key of the controlling directory
* @param placeLoc the place key
* @throws IOException If there is some I/O problem.
*/
public PickUpPlace(InputStream configStream, String dir, String placeLoc) throws IOException {
super(configStream, dir, placeLoc);
configurePickUpPlace();
}
/**
* Create a pick up place
*
* @param configStream the config stream
* @param placeLoc the place key
* @throws IOException If there is some I/O problem.
*/
public PickUpPlace(InputStream configStream, String placeLoc) throws IOException {
super(configStream, placeLoc);
configurePickUpPlace();
}
/**
* Configure the place specific items
* <ul>
* <li>MINIMUM_DATA_SIZE: min size in bytes of a file</li>
* <li>MAXIMUM_DATA_SIZE: max size in bytes of a file, -1 for unlimited</li>
* <li>OVERSIZE_DATA_HOLDING_AREA: where to put oversize data</li>
* <li>HOLDING_AREA: where to put data while inprocess</li>
* <li>ERROR_DATA: where to put things that have errors</li>
* <li>DONE_DATA: where it goes when done</li>
* <li>SIMPLE_MODE: boolean when true turns off DataIdentification engine</li>
* <li>INITIAL_FORM: one or more forms for new payloads</li>
* </ul>
*/
protected void configurePickUpPlace() {
minimumContentLength = configG.findIntEntry("MINIMUM_DATA_SIZE", minimumContentLength);
maximumContentLength = configG.findSizeEntry("MAXIMUM_DATA_SIZE", maximumContentLength);
oversizeArea = configG.findStringEntry("OVERSIZE_DATA_HOLDING_AREA", oversizeArea);
simpleMode = configG.findBooleanEntry("SIMPLE_MODE", false);
holdingArea = configG.findCanonicalFileNameEntry("HOLDING_AREA", null);
doneArea = configG.findCanonicalFileNameEntry("DONE_DATA", doneArea);
errorArea = configG.findCanonicalFileNameEntry("ERROR_DATA", "errorArea");
if (doneArea != null && doneArea.equals("")) {
doneArea = null;
logger.info("Alert: Completed data will be deleted from the system due to DONE_AREA setting");
}
logger.debug("Pickup Canonical HOLD => {}, Pickup Canonical DONE => {}, Pickup Canonical ERROR => {}", holdingArea, doneArea, errorArea);
initialFormValues = configG.findEntries("INITIAL_FORM");
if (initialFormValues.size() < 1) {
initialFormValues.add(Form.UNKNOWN);
}
// Grab the default pool
try {
agentPool = AgentPool.lookup();
} catch (NamespaceException e) {
logger.warn("Cannot find agent pool!");
}
ALWAYS_COPY_METADATA_VALS = configG.findEntriesAsSet("ALWAYS_COPY_METADATA");
// Whether or not to use the objectTrace logger
useObjectTraceLogger = configG.findBooleanEntry("USE_OBJECT_TRACE_LOGGER", useObjectTraceLogger);
}
/**
* Return the value of the inprocess area, usually a directory path
*
* @return holdingArea string
*/
@Override
public String getInProcessArea() {
return holdingArea;
}
/**
* Return the value of the error area, usually a directory path
*
* @return errorArea string
*/
@Override
public String getErrorArea() {
return errorArea;
}
/**
* Return the value of the done area, usually a directory path
*
* @return doneArea string
*/
@Override
public String getDoneArea() {
return doneArea;
}
/**
* Return the maximum content size for a file that can be handled by this place
*
* @return maximumContentLength string
*/
@Override
public long getMaximumContentLength() {
return maximumContentLength;
}
/**
* Return the minumum content size for a file that can be handled by this place
*
* @return minimumContentLength string
*/
@Override
public int getMinimumContentLength() {
return minimumContentLength;
}
/**
* Return the value of the oversize area, usually a directory path
*
* @return path to the oversize area
*/
@Override
public String getOversizeArea() {
return oversizeArea;
}
/**
* Add metadata as the data objects are created Can be overridden to customize behavior
*
* @param d the nascent data object
* @param f the file it came from
*/
protected void dataObjectCreated(IBaseDataObject d, File f) {
d.putParameter(FILE_DATE, TimeUtil.getDateAsISO8601(f.lastModified()));
d.putParameter(FILE_NAME, f.getName());
}
/**
* Call back from a data server or queue server when a new file is ready to process. This method is called for raw
* files, not work bundles, so the simpleMode determination is made by this Place configuration.
*
* @param f file to process
* @return true if it worked
* @throws IOException If there is some I/O problem.
*/
public boolean processDataFile(File f) throws IOException, EmissaryException {
boolean isOversize = false;
if (maximumContentLength != -1 && f.length() > maximumContentLength) {
logger.warn("Sorry, This file is too large ({} < {}): {}", f.length(), maximumContentLength, f.getPath());
isOversize = true;
// Let it continue on knowing it is too big
// as we may need a record of the file
}
String fixedName = fixFileName(f.getName());
return processDataFile(f, fixedName, isOversize, simpleMode, getDoneArea());
}
/**
* Call back from a data server or queue server when a new file is ready to process
*
* @param theFile file to process
* @param fixedName the good short name of the file
* @param isOversize true if the content is too big by configuration
* @param simpleMode true if no session parsing is desired
* @param outputRoot the done area
* @return true if it worked
* @throws IOException If there is some I/O problem.
*/
public boolean processDataFile(File theFile, String fixedName, boolean isOversize, boolean simpleMode, String outputRoot) throws IOException,
EmissaryException {
boolean success = true;
logger.debug("Starting processDataFile in PickUpPlace for {}", theFile);
ObjectTracingService.emitLifecycleEvent(null, fixedName, ObjectTracing.Stage.PickUp, useObjectTraceLogger);
// Handle oversize data quickly without reading the file
if (isOversize) {
handleOversizePayload(theFile, fixedName, simpleMode);
}
// Handle it without session parsing if simple mode is on
else if (simpleMode) {
handleSimplePayload(theFile, fixedName);
}
// Parse sessions out of the file
else {
try {
logger.debug("Starting processSessions on {}", theFile);
processSessions(theFile, fixedName);
logger.debug("Finished with processSessions on {}", theFile);
} catch (ParserException ex) {
logger.error("Cannot parse {}", theFile.getName(), ex);
success = false;
}
}
if (success) {
handleFileSuccess(theFile, outputRoot);
} else {
handleFileError(theFile);
}
logger.debug("Ending processDataFile {} {} {}", theFile, (success ? "success" : "failure"), (simpleMode ? "simple" : ""));
return success;
}
/**
* Handle oversize payload item
*
* @param theFile the file with the oversize data
* @param fixedName name to use for the object
* @param simpleMode simple flag from the input
* @return true
*/
protected boolean handleOversizePayload(File theFile, String fixedName, boolean simpleMode) throws EmissaryException {
// Send it away, blocks until an agent is ready
IBaseDataObject dataObject =
DataObjectFactory.getInstance(new Object[] {("The file is oversize at " + theFile.length() + " bytes").getBytes(), fixedName,
"OVERSIZE"});
dataObject.setParameter("SIMPLE_MODE", Boolean.toString(simpleMode));
dataObjectCreated(dataObject, theFile);
logger.info("**Deploying an agent for oversized {} and object {} simple={}", fixedName, dataObject.getInternalId(),
(simpleMode ? "simple" : ""));
assignToPooledAgent(dataObject, -1L);
return true;
}
/**
* Action to handle a simple mode File
*
* @param theFile the file that contains the data
* @param fixedName name to use for the dataObject
* @return true if the file is processed successfully
*/
protected boolean handleSimplePayload(File theFile, String fixedName) throws EmissaryException {
byte[] theContent = Executrix.readDataFromFile(theFile.getAbsolutePath());
return processDataObject(theContent, fixedName, theFile, true);
}
/**
* Action to move th file to the done area when successfully processed
*
* @param theFile the file that was processed
* @return true if the file was renamed
*/
protected boolean renameFileToDoneArea(File theFile) {
return renameFileToDoneArea(theFile, getDoneArea());
}
/**
* Action to move th file to the done area when successfully processed using the specified outut area
*
* @param theFile the file that was processed
* @param outputRoot a specified output root
* @return true if the file was renamed
*/
protected boolean renameFileToDoneArea(File theFile, @Nullable String outputRoot) {
String base = theFile.getPath();
boolean renamed = false;
if (holdingArea != null) {
base = base.substring(holdingArea.length());
}
if (outputRoot != null) {
File dest = new File(outputRoot + "/" + base);
dest.getParentFile().mkdirs();
renamed = theFile.renameTo(dest);
if (renamed) {
logger.info("{} processed and moved to done area {}", theFile.getName(), outputRoot);
} else {
logger.warn("{} processed but could not be moved to done area as {}", theFile.getName(), dest);
}
}
return renamed;
}
/**
* Get the endpoint file name for when the file is move to inProcess
*
* @param theFile the file to be considered
* @param eatPrefix optional prefix strip from the work bundle
* @return null if no holdingArea, else the new File endpoint
*/
@Nullable
protected File getInProcessFileNameFor(File theFile, @Nullable String eatPrefix) {
String base = theFile.getPath();
if (eatPrefix != null) {
base = base.substring(eatPrefix.length());
logger.debug("Using base of {} due to eatPrefix of {} chopped from incoming {}", base, eatPrefix, theFile);
}
if (holdingArea != null) {
return new File(holdingArea + "/" + base);
}
return null;
}
/**
* Action to move the file to inProcess area when taking ownership
*
* @param source the file to be renamed
* @param dest where it should end up, or use the holdingArea if nil
* @return true if renamed, false if not
*/
protected boolean renameToInProcessAreaAs(File source, @Nullable File dest) {
if (holdingArea == null && dest == null) {
logger.warn("Holding area not configured, cannot rename {}", source);
return false;
}
if (dest == null) {
dest = getInProcessFileNameFor(source, null);
} else {
dest.getParentFile().mkdirs();
}
boolean renamed = source.renameTo(dest);
if (renamed) {
logger.debug("{} moved to inProcess area as {}", source.getName(), dest);
} else {
logger.warn("{} could not be moved to inProcess area {}", source.getName(), dest);
}
return renamed;
}
/**
* Action to move the file to the error area due to failure to process
*
* @param theFile the file to move
* @return true if the rename was successful
*/
protected boolean renameFileToErrorArea(File theFile) {
boolean renamed = theFile.renameTo(new File(errorArea, theFile.getName()));
if (renamed) {
logger.warn("{} failed and is moved to the error area", theFile.getName());
} else {
logger.error("{} failed and could not be moved to the error area", theFile.getName());
}
return renamed;
}
/**
* Action to delete the file from the holding area
*
* @param theFile file to delete
*/
protected void deleteFileFromHoldingArea(File theFile) {
boolean deleted = theFile.delete();
if (deleted) {
logger.info("{} processed and deleted", theFile.getName());
} else {
logger.warn("{} processed but could not be deleted", theFile.getName());
}
}
/**
* File was successfully processed, take appropriate action
*
* @param theFile the file that was processed
*/
protected void handleFileSuccess(File theFile) {
handleFileSuccess(theFile, getDoneArea());
}
/**
* File was successfully processed, take appropriate action using specified done area
*
* @param theFile the file that was processed
* @param outputRoot the specified output done area
*/
protected void handleFileSuccess(File theFile, @Nullable String outputRoot) {
if (outputRoot != null) {
logger.debug("Handling file success by moving to doneArea {}", theFile);
renameFileToDoneArea(theFile, outputRoot);
} else if (holdingArea != null) {
logger.debug("Handling file success by deleting from holdingArea {}", theFile);
deleteFileFromHoldingArea(theFile);
} else {
logger.debug("Neither Done nor Holding areas defined, leaving file as {}", theFile.getName());
}
}
/**
* File failed to process, take appropriate action
*
* @param theFile the file that failed
*/
protected void handleFileError(File theFile) {
if (errorArea != null) {
logger.debug("Handling file error case for {}", theFile);
renameFileToErrorArea(theFile);
} else {
logger.warn("There is no error location defined and the file did not process. It is stuck at {}", theFile.getName());
}
}
/**
* Build a data object and handle the data bytes
*
* @param theContent the data bytes
* @param fixedName good short name for the data
* @param theFile where it came from
* @param simpleMode simple flag from the input
* @return true if it works
*/
protected boolean processDataObject(byte[] theContent, String fixedName, File theFile, boolean simpleMode) throws EmissaryException {
IBaseDataObject d = DataObjectFactory.getInstance(new Object[] {theContent, fixedName});
return processDataObject(d, fixedName, theFile, simpleMode);
}
/**
* Set up the dataobject and send it on the way
*
* @param d the nascent data object
* @param fixedName the short name of it
* @param theFile where it came from
* @param simpleMode simple flag from the input
* @return true if it works
*/
protected boolean processDataObject(IBaseDataObject d, String fixedName, File theFile, boolean simpleMode) throws EmissaryException {
String currentForm = d.popCurrentForm();
if (currentForm == null) {
// Add our stuff to the form stack if none is set (e.g from DecomposedSession)
for (int j = initialFormValues.size() - 1; j >= 0; j--) {
d.pushCurrentForm(initialFormValues.get(j));
}
} else {
d.pushCurrentForm(currentForm);
}
d.setParameter("SIMPLE_MODE", Boolean.toString(simpleMode));
dataObjectCreated(d, theFile);
logger.info("**Deploying an agent for {} and object {} forms={} simple={}", fixedName, d.getInternalId(), d.getAllCurrentForms(),
(simpleMode ? "simple" : ""));
assignToPooledAgent(d, -1L);
return true;
}
/**
* Parse out sessions and process data from a file
*
* @param theFile file to process
* @param fixedName the good short name of the file
* @return count of sessions parsed
* @throws IOException If there is some I/O problem.
*/
public int processSessions(File theFile, String fixedName) throws IOException, ParserException {
// We are going to prefer a RAF parser if one
// is available so start by getting the file opened
logger.debug("PickUpPlace: Starting on {}", theFile.getName());
int sessionNum = 0;
try (RandomAccessFile raf = new RandomAccessFile(theFile, "r")) {
// Get the right type of session parser
SessionParser sp = parserFactory.makeSessionParser(raf.getChannel());
logger.debug("Using session parser from raf ident {}", sp.getClass().getName());
// .. and a session producer to crank out the data objects...
SessionProducer dof = new SessionProducer(sp, myKey, null);
long fileStart = System.currentTimeMillis();
long totalSize = 0;
// For each session get a data object from the producer
while (true) {
long sessionStart = System.currentTimeMillis();
try {
// Use filename-xx for defualt name
String sessionName = fixedName + "-" + (sessionNum + 1);
IBaseDataObject dataObject = dof.getNextSession(sessionName);
logger.debug("Pulled session {} from {} shortName={}", sessionName, theFile.getName(), dataObject.shortName());
sessionNum++;
long sessionEnd = System.currentTimeMillis();
totalSize += dataObject.data().length;
logger.info("sessionParseMetric:{},{},{},{},{},{}", sessionEnd - sessionStart, sp.getClass().getName(), theFile, sessionName,
sessionNum, dataObject.data().length);
processDataObject(dataObject, sessionName, theFile, false);
} catch (ParserEOFException eof) {
// expected at end of file
long fileEnd = System.currentTimeMillis();
logger.info("fileParseMetric:{},{},{},{},{}", fileEnd - fileStart, sp.getClass().getName(), theFile, sessionNum, totalSize);
break;
} catch (EmissaryException ex) {
logger.error("Could not dispatch {}", theFile.getName(), ex);
throw new ParserException("Could not process" + theFile.getName(), ex);
}
} // end while(true)
}
logger.debug("Done processing {} sessions from {}", sessionNum, theFile.getName());
return sessionNum;
}
/**
* Parse out sessions and process data from a byte array
*
* @param data the bytes to process
* @param fixedName the good short name of the file
* @param theFile file object representing path data belongs to
* @return count of sessions parsed
*/
public int processSessions(byte[] data, String fixedName, File theFile) {
// We are going to prefer a byte array parser since
// the data is already in memory
logger.debug("PickUpPlace: Starting on {}", theFile.getName());
int sessionNum = 0;
// Get the right type of session parser
SessionParser sp = parserFactory.makeSessionParser(InMemoryChannelFactory.create(data).create());
// .. and a session producer to crank out the data objects...
SessionProducer dof = new SessionProducer(sp, myKey, null);
// For each session get a data object from the producer
while (true) {
try {
// Use filename-xx for default name
String sessionName = fixedName + "-" + (sessionNum + 1);
IBaseDataObject dataObject = dof.getNextSession(sessionName);
sessionNum++;
processDataObject(dataObject, sessionName, theFile, false);
} catch (ParserEOFException eof) {
// expected at end of file
break;
} catch (EmissaryException ex) {
logger.error("Could not dispatch {}", fixedName, ex);
}
} // end while(true)
logger.debug("Done processing {} sessions from {}", sessionNum, theFile.getName());
return sessionNum;
}
/**
* Produce a legal tracking filename from the disk filename
*
* @return fixed filename
*/
@Nullable
protected String fixFileName(@Nullable String v) {
if (v == null) {
return null;
}
String s = v.replace(' ', '_');
s = s.replace('\t', '_');
s = s.replace('\n', '_');
s = s.replace('\r', '_');
s = s.replace('\f', '_');
s = s.replace(':', '_');
if (s.startsWith(".")) {
s = "_dot_" + s.substring(1);
}
return s;
}
/**
* Retrieve and agent from the pool and assign the payload to it
*
* @param payload the payload for the agent
* @param timeoutMs maximum time in millis to wait for an agent from the pool. Set to -1 to wait forever. The specified
* time will not be strictly observed because the pool itself blocks for a configurable amount of time when
* requesting an agent. We will wait no more than the specified timeoutMs + the configured pool timeout value.
* @throws EmissaryException when an agent cannot be obtained
*/
public void assignToPooledAgent(IBaseDataObject payload, long timeoutMs) throws EmissaryException {
assignToPooledAgent(payload, agentPool, this, timeoutMs);
}
/**
* Retrieve and agent from the specified pool and assign the payload to it
*
* @param payload the payload for the agent
* @param agentPool the pool of agents
* @param startingLocation the agent launch point
* @param timeoutMs maximum time in millis to wait for an agent from the pool. Set to -1 to wait forever. The specified
* time will not be strictly observed because the pool itself blocks for a configurable amount of time when
* requesting an agent. We will wait no more than the specified timeoutMs + the configured pool timeout value.
* @return mobile agent assigned to pool
* @throws EmissaryException when an agent cannot be obtained
*/
public static IMobileAgent assignToPooledAgent(IBaseDataObject payload, @Nullable AgentPool agentPool, IServiceProviderPlace startingLocation,
long timeoutMs) throws EmissaryException {
IMobileAgent agent = null;
long startTime = System.currentTimeMillis();
boolean warningGiven = false;
int loopCount = 0;
MDC.put(MDCConstants.SHORT_NAME, payload.shortName());
try {
if (agentPool == null) {
agentPool = AgentPool.lookup();
}
do {
loopCount++;
try {
agent = agentPool.borrowAgent();
} catch (Exception e) {
if (!warningGiven) {
slogger.debug("Cannot get agent from pool, trying again ", e);
warningGiven = true;
}
}
} while (agent == null && (timeoutMs < 0 || (startTime + timeoutMs) < System.currentTimeMillis()));
if (agent == null) {
throw new EmissaryException("No agent found for " + payload.shortName() + " after " + loopCount + " tries.");
} else if (loopCount > 1) {
slogger.info("Found agent after {} tries", loopCount);
}
agent.go(payload, startingLocation);
Thread.yield();
} finally {
MDC.remove(MDCConstants.SHORT_NAME);
}
return agent;
}
public static boolean implementsPickUpPlace(Class<? extends Object> clazz) {
return ClassComparator.isaImplementation(clazz, IPickUpPlace.class);
}
}