Job Module

Last modified by Mathieu Pace on 2024/12/19 11:45

cogA set of APIs to easily communicate with an asynchronous task
TypeJAR
Category
Developed by

XWiki Development Team

Rating
0 Votes
LicenseGNU Lesser General Public License 2.1
Bundled With

XWiki Standard

Compatibility

Since 4.0 Milestone 1

Description

The goal of this module is to make easier to allow two-way communication with a task executed in a background thread.

Minimal job provide:

  • progress information
  • live log
  • Java bean based question/answer system

and can be extended as long as the client of the API know the extended interface.

Use an existing job

The common way to use a job is through org.xwiki.job.JobExecutor component since 6.1 and org.xwiki.job.JobManager component before 6.1.

org.xwiki.job.JobManager execute all jobs one by one in a unique thread. For other use case you can either create you own JobManager component or use org.xwiki.job.JobExecutor starting with XWiki 6.1.

DefaultRequest jobRequest = new DefaultRequest();

// Indicate an id to allow you to access the status of this unique task, the id is required to save/retrieve the status of the job
jobRequest.setId("taskid");
// Allow the job to ask questions during its executing (false by default)
jobRequest.setInteractive(true);
// [since 5.4] Indicate if the job should log informations about what is going on (to display it during the process for example, true by default)
jobRequest.setVerbose(true);
// [since 10.0] Force the serialization (or not) of the job status, null (the default) to fallback on {@link JobStatus#isSerialized()} (which is usually true)
jobRequest.setStatusSerialized(true);
// [since 10.0] Force the isolation (or not) of the job log from standard output, null (the default) to fallback on {@link JobStatus#isIsolated()} (which is usually true)
jobRequest.setStatusLogIsolated(true);
// Put some custom configuration
jobRequest.setProperty("some custom jobtype related parameter", "value");
// [since 10.10] Set context entries (current request, current user, etc)
// this.contextStoreManager is a component of role "org.xwiki.context.concurrent.ContextStoreManager".
Map<String, Serializable> contextEntries = this.contextStoreManager.save(Arrays.asList("request.*", "action"));
jobRequest.setContext(contextEnties)

// Lookup and add Job with role hint "jobtype" to the queue and return right away
Job job = jobExecutor.execute("jobtype", request);

// Wait until the job is finished
job.join();

You can check valid Job context entries.

Create a reusable job

To register a new job that anyone can then use in JobExecutor (or JobManager in older versions) you need to register a component with role org.xwiki.job.Job and a unique role hint.

Since 7.4 a public org.xwiki.job.AbstractJob is provided to help implementing a Job. An org.xwiki.job.internal.AbstractJob class already existed since 5.0 but it was internal.
org.xwiki.job.AbstractJob is dealing with a lot of plumbing, among other things:

  • it automatically stores anything logged (the standard slf4j way) in the status
  • it automatically listens to events generated through org.xwiki.job.event.status.JobProgressManager component and update the progress in the job status
  • it deals with question/answer complexity (waiting for an answer, etc.)
  • it automatically stores the job status asynchronously when the job is finished
  • it logs and generates related events when the job starts and when the job ends
  • it implements #join()
  • it maintains a tree of jobs in the context
  • since 10.2, it implements CancelableJobstatus It's disabled by default, to indicate your job is cancelable you must indicate it with AbstractJobStatus#setCancelable

Grouped Job

Since 6.1 a Job can also implement org.xwiki.job.GroupedJob to indicate which group this job is part of. 

In a group, jobs are executed in the same thread pool by JobExecutor.
Since 12.5RC1 it's possible to provide a dedicated configuration to the thread pool of a specific GroupedJob, to chose for example the size of the pool or the default priority of the threads for those jobs. To provide that configuration you need to declare a new component that implements org.xwiki.job.GroupedJobInitializer and whose method getId matches the JobGroupPath of the GroupedJob. 

Note that since the JobGroupPath defines a hierarchy of GroupedJobs, it is possible to use a configuration for all the threads pools of a hierarchy by returning in GroupedJobInitializer#getId a JobGroupPath matching a specific node of the hierarchy.

So for example, you could have two GroupedJobs using the following JobGroupPaths:

  • myExtension/groupedJob1
  • myExtension/groupedJob2

and a dedicated GroupedJobInitializer with getId returning the JobGroupPath myExtension: this initializer would be used for both GroupedJobs.

Execute custom job

JobExecutor provide an API to execute a custom instance of Job.

jobExecutor.execute(job);

Starting with 5.1, it's also possible to create a custom Job and add it to the queue using JobManager#addJob(Job job).

Notify about progress

It's possible to notify about progression of a task by sending progression event that are then received by the job (or the jobs) to update its progress informations.

In Velocity scripts

[Since 7.1]

// Push 2 sub steps
$services.progress.pushLevel(2)

// First step
$services.progress.startStep('First step')
#firstStep()
$services.progress.endStep()

// Second step
$services.progress.startStep('Second step')
#secondStep()
$services.progress.endStep()

$services.progress.popLevel()

In Java

Since 7.1

class MyComponent
{
 @Inject
 private JobProgressManager progressManager;

 void foo()
 {
   // Push 2 sub steps
   progressManager.pushLevelProgress(2, this);

   try {
     // First step
     this.progressManager.startStep(this, "First step");
      firstStep();
     this.progressManager.endStep(this);

     // Second step
     this.progressManager.startStep(this, "Second step");
      secondStep();
   } finally {
     // Done, go back to parent progress level
     this.progressManager.popLevelProgress(this);
   }
 }

 void firstStep()
 {
    progressManager.call(() -> {
       // First sub-step
       progressManager.startStep(this);
        firstSubStep();
        progressManager.endStep(this);

       // Second sub-step
       progressManager.startStep(this);
        secondSubStep();

      return null;
   }, 2, this);
 }
}

Between 6.1 and 7.0:

class MyComponent
{
 @Inject
 private JobProgressManager progressManager;

 void foo()
 {
   // Push 2 sub steps
   progressManager.pushLevelProgress(2, this);

   try {
      firstStep();

     // First step done
     this.progressManager.stepPropress(this);

      secondStep();
   } finally {
     // Done, go back to parent progress level
     this.progressManager.popLevelProgress(this);
   }
 }

 void firstStep()
 {
   // Since 7.1M1 Callable support has been added
   progressManager.call(new Callable() {
        firstSubStep();

       // First step done
       progressManager.stepPropress(this);

        secondSubStep();
     },
   2, this);
 }
}

Before 6.1:

class MyComponent
{
 @Inject
 private ObservationManager observationManager;

 protected void notifyPushLevelProgress(int steps)
 {
     this.observationManager.notify(new PushLevelProgressEvent(steps), this);
 }

 protected void notifyStepPropress()
 {
     this.observationManager.notify(new StepProgressEvent(), this);
 }

 protected void notifyPopLevelProgress()
 {
     this.observationManager.notify(new PopLevelProgressEvent(), this);
 }

 void foo()
 {
   // Push 2 sub steps
   notifyPushLevelProgress(2, this);

   try {
      firstStep();

     // First step done
     notifyStepPropress(this);

      secondStep();
   } finally {
     // Done, go back to parent progress level
     notifyPopLevelProgress(this);
   }
 }
}

Command line

An example of a command line installJob is available on the XWikiRESTfulAPI page

Job status

Each job expose a live status which will also be automatically serialized when the job is finished (if the job has an id).

Progress

The job progress provide data to display live information about the job like a progress bar.
It also provides a tree of steps with an associated message and elapsed time for each one.

Events

The following notification events are fired during the execution of a job whose status extends the default job status:

  • JobStartedEvent: fired when the job starts. The event is send with the following parameters:
    • source: the job that has started
    • data: null
  • QuestionAskedEvent: fired when an interactive job asks a question. The event listeners have the chance to answer the question before it reaches the user. The event is send with the following parameters:
    • source: the status of the job that asked the question, which can be used to access the question
    • data: null
  • QuestionAnsweredEvent: Event fired after a question raised by an interactive job is answered. The event is send with the following parameters:
    • source: the job status, that can be used to access the question
    • data: null
  • JobFinishedEvent: fired when the job finishes. The event is send with the following parameters:
    • source: the job that has finished
    • data: a Throwable if the job execution failed or null otherwise
  • [since 8.1] JobFinishingEvent: fired just after a job is done and we start to save status etc. Give a chance to listeners to do action as part of the job execution before it's "closed". The event is send with the following parameters:
    • source: the job that has finished
    • data: a Throwable if the job execution failed or null otherwise

Storage

If the job has an identifier and if serialization is not disabled the JobStatus Java object is serialized as XML on disk and is, by default, located in <permdir>/jobs/status/<id/of/the/job>/ folder.

The status object itself is stored XWiki <13.7 in a status.xml file or XWiki 13.7+ zipped in a status.xml.zip file.
The status log is stored live (used to be in memory and then stored at the end) in a separate file (log.xml).

Job questions

See Job Question and Answer system.

Configuration

#-# [Since 4.0M1]
#-# The folder containing job executing status.
#-# The default is {environment.permanentDirectory}/jobs/
# job.statusFolder=/var/lib/xwiki/data/jobs/

#-# [Since 7.2M2]
#-# The maximum number of entries to put in the job status cache.
#-# The default is 50.
# job.statusCacheSize=50


#-# [Since 12.5RC1]
#-# The maximum number of entries to put in cache for the GroupedJobInitializer components.
#-# The default is 100.
# job.groupedJobInitializerCacheSize=100

#-# [Since 12.5RC1]
#-# The thread keep-alive time in milliseconds for the single job executor.
#-# This value defines how long a thread can be idle before being terminated by the executor.
#-# The default value is 60000 for 60 000ms.
# job.singleJobThreadKeepAliveTime=60000

#-# [Since 12.5RC1]
#-# The thread keep-alive time in milliseconds for the grouped job executors.
#-# This value defines how long a thread can be idle before being terminated by an executor.
#-# The default value is 60000 for 60 000ms.
# job.groupedJobThreadKeepAliveTime=60000

Script Service

Starting with XWiki 7.2M1 there is a script service available to retrieve:

  • the status of a specified job
  • the current job from a specified group and its status
## The job id is a list of strings which are domain specific.
#set ($farmDistributionJobStatus = $services.job.getJobStatus(['distribution']))
#set ($solrIndexerJobStatus = $services.job.getJobStatus(['solr', 'indexer']))

#set ($currentJobStatus = $services.job.getCurrentJobStatus(['my', 'job', 'group']))
The current job is $currentJobStatus.state

JavaScript API

The xwiki-job-runner JavaScript module can be used to trigger a job on the server side and to wait for it to finish. Here's an example:

require(['xwiki-job-runner'], function(JobRunner) {
 Promise.resolve(new JobRunner({
    createStatusRequest: jobId => {
     return {
       // Specify the URL that can be used to fetch the job status and progress. The response should be a JSON job
       // object that has at least 3 properties: id, state and progress.
       url: jobStatusURL,
        data: {
         // Data to pass when requesting the job status. For instance:
         jobId: jobId.join('/')
        }
      };
    }
 // Specify the URL that can be used to trigger the server-side job. The response should be a JSON job object that has
 // at least 3 properties: id, state and progress.
 }).run(jobTriggerURL, {
   // Data needed to start the job (usually job request data).
 })).then(job => {
   // The promise resolves when the job ends. It's up to you to detect if the job finished successfully or not (usually
   // by checking if error messages were logged during the execution of the job).
   console.log('Job finished!');
  });
});

Related Links

Get Connected