Jobs

jobs.base

ZeroG BaseJobSchema and BaseJob class definitions

exception zerog.jobs.base.ErrorContinue

Bases: Exception

exception zerog.jobs.base.ErrorFinish

Bases: Exception

exception zerog.jobs.base.WarningContinue

Bases: Exception

exception zerog.jobs.base.WarningFinish

Bases: Exception

class zerog.jobs.base.BaseJobSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)

Bases: marshmallow.schema.Schema

BaseJob persisted attributes

Variables
  • documentType (str) – used to create datastore key

  • jobType (str) – used to get registered job schema

  • schemaVersion (float) – used to update schemas

  • createdAt (datetime) – time job was created

  • updatedAt (datetime) – time job was last updated

  • cas (str) – used to prevent job update collisions

  • uuid (str) – unique job identifier

  • logId (str) – job id to show in logs

  • queueName (str) – name of queue for job

  • queueKwargs (dict) – keyword args used to enqueue job

  • queueJobId (int) – id of job in queue

  • events (list) – list of logged events for job

  • errors (list) – list of logged errors for job

  • warnings (list) – list of logged warnings for job

  • running (boolean) – True if job is currently running

  • errorCount (int) – number of times job has had an exception

  • completeness (float) – completion percentage 0.0 - 1.0

  • tickcount (float) – record of completeness ticks

  • tickval (float) – completeness increment per tick

  • resultCode (int) – job resultCode, -1 if incomplete, 200 for success

class zerog.jobs.base.BaseJob(datastore, queue, keepalive=None, **kwargs)

Bases: abc.ABC

The base class for all ZeroG jobs.

Variables
  • DOCUMENT_TYPE (str) – document type used to make the datastore key. NEVER override this attribute

  • SCHEMA_VERSION (str) – version which can be used to manage schema changes. You MAY override this attribute

  • SCHEMA (class) – the marshmallow schema used to serialize/deserialize this job. You MAY override this attribute to add fields to the base schema. The schema must be a subclass of BaseJobSchema

  • JOB_TYPE (str) – a unique string identifying this type of job. You MUST override this attribute.

  • MAX_ERRORS (int) – maximum number of error retries before the job fails. You MAY override this attribute.

Subclasses MUST

  • override the run() method

__init__(datastore, queue, keepalive=None, **kwargs)

Initialize the job with deserialized data.

Subclasses MUST override this method if they use a subclass of BaseJobSchema to add fields.

If overriding this method, you MUST call the parent __init__() using super

This __init__() method is the opportunity to load any extra fields that are declared in the associated schema.

Required fields can be loaded directly by referencing their key.

Optional fields need to be loaded using the dictionary’s get method, which gives an opportunity to load the field with a default value if it isn’t present in the input data.

Example:

def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)

    self.requiredField = kwargs['requiredField']
    self.optionalField = kwargs.get('optionalField', "default")
dump()

Serialize the job according to the job schema.

Returns

serialized job data

Return type

native Python data types

dumps(**kwargs)

Serialize the job according to the job schema.

Returns

serialized job data

Return type

JSON-encoded string

key()

Constructs the datastore key for this job

Returns

datastore key for this job

Return type

str

save()

Saves job instance to the datastore. Fails if job was updated in the datastore since this instance was last updated.

Returns

None

reload()

Reload job data from the datastore and update this instance.

This may be necessary if another python instance has updated this job in the datastore and the cas loaded here is out of date.

Returns

None

update_attrs(**kwargs)

Updates a job’s attributes. Only updates the attributes specified in the keyword arguments. Saves the update to the datastore.

Example:

self.update_attrs(queueJobId=10, running=True)
Returns

None

record_event(msg)

Records an event in the job’s events list.

Parameters

msg (str) – the event message

Returns

None

record_warning(msg)

Records a warning in the job’s warnings list.

Parameters

msg (str) – the warning message

Returns

None

record_error(errorCode, msg, exception=None)

Records an error in the job’s errors list.

The exception keyword argument is passed so that a job can override this method to add extra error handling for specific exceptions.

Parameters
  • errorCode (int) – an error code associated with the error

  • msg (str) – the error message

  • exception (object) – the exception that caused the error if the error is the result of an unhandled exception

Returns

None

record_result(resultCode)

Record the result of a job. This method is called by the base worker when a job completes, so it does not need to be explicitly called in most cases.

job_log_info(msg)

Records an event in the job’s events list and logs it using the current Python logger.

Parameters

msg (str) – the event message

Returns

None

job_log_warning(msg)

Records a warning in the job’s warnings list and logs it using the current Python logger.

Parameters

msg (str) – the warning message

Returns

None

job_log_error(errorCode, msg, exception=None)

Records an error in the job’s errors list and logs it using the current Python logger.

Parameters
  • errorCode (int) – an error code associated with the error

  • msg (str) – the error message

  • exception (object) – the exception that caused the error if the error is the result of an unhandled exception

Returns

None

raise_warning_continue(resultCode, msg)

Interrupts job execution and records a warning. Job may continue after being requeued

Parameters

msg (str) – the warning message

raise_warning_finish(resultCode, msg)

Interrupts job execution, records a warning, and terminates the job.

Parameters

msg (str) – the warning message

raise_error_continue(errorCode, msg)

Interrupts job execution and records an error. Job may continue after being requeued

Parameters
  • errorCode (int) – an error code associated with the error

  • msg (str) – the error message

raise_error_finish(errorCode, msg)

Interrupts job execution, records an error, and terminates the job.

Parameters
  • errorCode (int) – an error code associated with the error

  • msg (str) – the error message

set_completeness(completeness)

Sets the absolute value of the job’s completeness. Clamps value to a range of 0.0 to 1.0

Parameters

completeness (float) – absolute completeness value

Returns

None

add_to_completeness(delta)

Increment the job’s completeness. Adds any unrecorded ticks. Resulting completeness will be clamped to a range of 0.0 to 1.0.

Parameters

delta (float) – amount by which to increment completeness

Returns

None

set_tick_value(tickval)

Sets the amount the job’s completeness will be incremented by a call to the tick method

Parameters

tickval (float) – amount to increment completeness for each tick

Returns

None

tick()

Accumulates the job’s tickcount. Adds tickcount to completeness when it is >= 0.01

Returns

None

enqueue(**kwargs)

Add a job to its queue.

Sets the job’s queueJobId if enqueueing is successful. Sets it to -1 if enqueueing fails.

Params dict kwargs

keyword arguments that will be passed to the queueing client

Returns

None

progress()

Returns a job’s completeness and result.

Returns

current values of completeness & resultCode

Return type

dict

info()

Returns a job’s completeness, result, events, warnings, and errors.

Returns

current values of completeness, resultCode, events, warnings, and errors

Return type

dict

get_data()

Returns result data for this job.

Override this method if the job needs to return data once it is complete.

Returns

output data

Return type

dict

continue_running()

called by the worker after a job is interrupted by an exception to determine if the job should be requeued to continue running

Default is to terminate after self.MAX_ERRORs errors have been recorded.

Override this method as needed for more complex error handling

Returns

NO_RESULT (-1) if the job should continue. INTERNAL_ERROR (500) if the job should terminate

Return type

int

abstract run()

This method MUST be overridden.

This method executes the job. It is called by the base worker once the job has been successfully loaded

Returns

resultCode for the job. Return NO_RESULT if job needs to be requeued for further processing. Otherwise use HTTP resultCodes (200s for success, etc.)

zerog.jobs.base.make_key(uuid)

Makes a unique datastore key for a job.

Parameters

uuid (str) – uuid of the job

Returns

datastore key

Return type

str

jobs.event

Copyright (c) 2017 MotiveMetrics. All rights reserved.

class zerog.jobs.event.EventSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)

Bases: marshmallow.schema.Schema

Schema for recording an event that occurred while processing a job

jobs.warning

Copyright (c) 2020 MotiveMetrics. All rights reserved.

class zerog.jobs.warning.WarningSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)

Bases: marshmallow.schema.Schema

Schema for recording an warning that occurred while processing a job

jobs.error

ZeroG ErrorSchema and Error class definitions

class zerog.jobs.error.ErrorSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)

Bases: marshmallow.schema.Schema

Schema for recording an error that occurred while processing a job

class zerog.jobs.error.Error(**kwargs)

Bases: object

Error object used to record errors in a ZeroG BaseJob