Jobs
jobs.base
ZeroG BaseJobSchema and BaseJob class definitions
- exception zerog.jobs.base.ErrorContinue
Bases:
Exception
- exception zerog.jobs.base.ErrorFinish
Bases:
Exception
- exception zerog.jobs.base.WarningContinue
Bases:
Exception
- exception zerog.jobs.base.WarningFinish
Bases:
Exception
- class zerog.jobs.base.BaseJobSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)
Bases:
marshmallow.schema.SchemaBaseJob persisted attributes
- Variables
documentType (str) – used to create datastore key
jobType (str) – used to get registered job schema
schemaVersion (float) – used to update schemas
createdAt (datetime) – time job was created
updatedAt (datetime) – time job was last updated
cas (str) – used to prevent job update collisions
uuid (str) – unique job identifier
logId (str) – job id to show in logs
queueName (str) – name of queue for job
queueKwargs (dict) – keyword args used to enqueue job
queueJobId (int) – id of job in queue
events (list) – list of logged events for job
errors (list) – list of logged errors for job
warnings (list) – list of logged warnings for job
running (boolean) – True if job is currently running
errorCount (int) – number of times job has had an exception
completeness (float) – completion percentage 0.0 - 1.0
tickcount (float) – record of completeness ticks
tickval (float) – completeness increment per tick
resultCode (int) – job resultCode, -1 if incomplete, 200 for success
- class zerog.jobs.base.BaseJob(datastore, queue, keepalive=None, **kwargs)
Bases:
abc.ABCThe base class for all ZeroG jobs.
- Variables
DOCUMENT_TYPE (str) – document type used to make the datastore key. NEVER override this attribute
SCHEMA_VERSION (str) – version which can be used to manage schema changes. You MAY override this attribute
SCHEMA (class) – the marshmallow schema used to serialize/deserialize this job. You MAY override this attribute to add fields to the base schema. The schema must be a subclass of BaseJobSchema
JOB_TYPE (str) – a unique string identifying this type of job. You MUST override this attribute.
MAX_ERRORS (int) – maximum number of error retries before the job fails. You MAY override this attribute.
Subclasses MUST
override the
run()method
- __init__(datastore, queue, keepalive=None, **kwargs)
Initialize the job with deserialized data.
Subclasses MUST override this method if they use a subclass of BaseJobSchema to add fields.
If overriding this method, you MUST call the parent
__init__()usingsuperThis
__init__()method is the opportunity to load any extra fields that are declared in the associated schema.Required fields can be loaded directly by referencing their key.
Optional fields need to be loaded using the dictionary’s
getmethod, which gives an opportunity to load the field with a default value if it isn’t present in the input data.Example:
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.requiredField = kwargs['requiredField'] self.optionalField = kwargs.get('optionalField', "default")
- dump()
Serialize the job according to the job schema.
- Returns
serialized job data
- Return type
native Python data types
- dumps(**kwargs)
Serialize the job according to the job schema.
- Returns
serialized job data
- Return type
JSON-encoded string
- key()
Constructs the datastore key for this job
- Returns
datastore key for this job
- Return type
str
- save()
Saves job instance to the datastore. Fails if job was updated in the datastore since this instance was last updated.
- Returns
None
- reload()
Reload job data from the datastore and update this instance.
This may be necessary if another python instance has updated this job in the datastore and the cas loaded here is out of date.
- Returns
None
- update_attrs(**kwargs)
Updates a job’s attributes. Only updates the attributes specified in the keyword arguments. Saves the update to the datastore.
Example:
self.update_attrs(queueJobId=10, running=True)
- Returns
None
- record_event(msg)
Records an event in the job’s
eventslist.- Parameters
msg (str) – the event message
- Returns
None
- record_warning(msg)
Records a warning in the job’s
warningslist.- Parameters
msg (str) – the warning message
- Returns
None
- record_error(errorCode, msg, exception=None)
Records an error in the job’s
errorslist.The
exceptionkeyword argument is passed so that a job can override this method to add extra error handling for specific exceptions.- Parameters
errorCode (int) – an error code associated with the error
msg (str) – the error message
exception (object) – the exception that caused the error if the error is the result of an unhandled exception
- Returns
None
- record_result(resultCode)
Record the result of a job. This method is called by the base worker when a job completes, so it does not need to be explicitly called in most cases.
- job_log_info(msg)
Records an event in the job’s
eventslist and logs it using the current Python logger.- Parameters
msg (str) – the event message
- Returns
None
- job_log_warning(msg)
Records a warning in the job’s
warningslist and logs it using the current Python logger.- Parameters
msg (str) – the warning message
- Returns
None
- job_log_error(errorCode, msg, exception=None)
Records an error in the job’s
errorslist and logs it using the current Python logger.- Parameters
errorCode (int) – an error code associated with the error
msg (str) – the error message
exception (object) – the exception that caused the error if the error is the result of an unhandled exception
- Returns
None
- raise_warning_continue(resultCode, msg)
Interrupts job execution and records a warning. Job may continue after being requeued
- Parameters
msg (str) – the warning message
- raise_warning_finish(resultCode, msg)
Interrupts job execution, records a warning, and terminates the job.
- Parameters
msg (str) – the warning message
- raise_error_continue(errorCode, msg)
Interrupts job execution and records an error. Job may continue after being requeued
- Parameters
errorCode (int) – an error code associated with the error
msg (str) – the error message
- raise_error_finish(errorCode, msg)
Interrupts job execution, records an error, and terminates the job.
- Parameters
errorCode (int) – an error code associated with the error
msg (str) – the error message
- set_completeness(completeness)
Sets the absolute value of the job’s completeness. Clamps value to a range of 0.0 to 1.0
- Parameters
completeness (float) – absolute completeness value
- Returns
None
- add_to_completeness(delta)
Increment the job’s completeness. Adds any unrecorded ticks. Resulting completeness will be clamped to a range of 0.0 to 1.0.
- Parameters
delta (float) – amount by which to increment completeness
- Returns
None
- set_tick_value(tickval)
Sets the amount the job’s completeness will be incremented by a call to the tick method
- Parameters
tickval (float) – amount to increment completeness for each tick
- Returns
None
- tick()
Accumulates the job’s tickcount. Adds tickcount to completeness when it is >= 0.01
- Returns
None
- enqueue(**kwargs)
Add a job to its queue.
Sets the job’s queueJobId if enqueueing is successful. Sets it to -1 if enqueueing fails.
- Params dict kwargs
keyword arguments that will be passed to the queueing client
- Returns
None
- progress()
Returns a job’s completeness and result.
- Returns
current values of completeness & resultCode
- Return type
dict
- info()
Returns a job’s completeness, result, events, warnings, and errors.
- Returns
current values of completeness, resultCode, events, warnings, and errors
- Return type
dict
- get_data()
Returns result data for this job.
Override this method if the job needs to return data once it is complete.
- Returns
output data
- Return type
dict
- continue_running()
called by the worker after a job is interrupted by an exception to determine if the job should be requeued to continue running
Default is to terminate after self.MAX_ERRORs errors have been recorded.
Override this method as needed for more complex error handling
- Returns
NO_RESULT (-1) if the job should continue. INTERNAL_ERROR (500) if the job should terminate
- Return type
int
- abstract run()
This method MUST be overridden.
This method executes the job. It is called by the base worker once the job has been successfully loaded
- Returns
resultCode for the job. Return NO_RESULT if job needs to be requeued for further processing. Otherwise use HTTP resultCodes (200s for success, etc.)
- zerog.jobs.base.make_key(uuid)
Makes a unique datastore key for a job.
- Parameters
uuid (str) – uuid of the job
- Returns
datastore key
- Return type
str
jobs.event
Copyright (c) 2017 MotiveMetrics. All rights reserved.
- class zerog.jobs.event.EventSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)
Bases:
marshmallow.schema.SchemaSchema for recording an event that occurred while processing a job
jobs.warning
Copyright (c) 2020 MotiveMetrics. All rights reserved.
- class zerog.jobs.warning.WarningSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)
Bases:
marshmallow.schema.SchemaSchema for recording an warning that occurred while processing a job
jobs.error
ZeroG ErrorSchema and Error class definitions
- class zerog.jobs.error.ErrorSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)
Bases:
marshmallow.schema.SchemaSchema for recording an error that occurred while processing a job
- class zerog.jobs.error.Error(**kwargs)
Bases:
objectError object used to record errors in a ZeroG BaseJob