Saturday, February 21, 2015

Extending the parallel programming library

Recently Robert Love blogged about Exception Management in the PPL (let's please stick to this abbreviation since that is what Embarcadero calls it).

What I was missing though was handling exceptions in fire and forget tasks since you usually don't have some place that calls wait on them just to get the exception being raised.

So I quickly hacked together some stuff to show how to use a feature from the TLP (that's the .NET one): ContinueWith. From its documentation:

Creates a continuation that executes asynchronously when the target Task completes.

Easy enough. We got all the pieces to create this - how this can be done has been shown previously. Too bad he did not make a method of it but hacked it all into a button click event. :(

Did I just hear anyone say: "Hey, that would have been a use-case for an interface helper, right!?" back there? Well, you are right...

Enough talk - let's look at the code. Keep in mind this is just some quick and dirty example to show how to extend TTask in a clean way to add new features. Let's hope they will come out of the box with the next version because continuations on tasks are a must have imho.

unit ThreadingEx;

interface

uses
  SysUtils,
  Threading;

type
  TAction<T> = reference to procedure(const arg: T);

  TTaskContinuationOptions = (
    NotOnCompleted,
    NotOnFaulted,
    NotOnCanceled,
    OnlyOnCompleted,
    OnlyOnFaulted,
    OnlyOnCanceled
  );

  ITaskEx = interface(ITask)
    ['{3AE1A614-27AA-4B5A-BC50-42483650E20D}']
    function GetExceptObj: Exception;
    function GetStatus: TTaskStatus;
    function ContinueWith(const continuationAction: TAction<ITaskEx>;
      continuationOptions: TTaskContinuationOptions): ITaskEx;

    property ExceptObj: Exception read GetExceptObj;
    property Status: TTaskStatus read GetStatus;
  end;

  TTaskEx = class(TTask, ITaskEx)
  private
    fExceptObj: Exception;
    function GetExceptObj: Exception;
  protected
    function ContinueWith(const continuationAction: TAction<ITaskEx>;
      continuationOptions: TTaskContinuationOptions): ITaskEx;
  public
    destructor Destroy; override;

    class function Run(const action: TProc): ITaskEx; static;
  end;

implementation

uses
  Classes;

{ TTaskEx }

function TTaskEx.ContinueWith(const continuationAction: TAction<ITaskEx>;
  continuationOptions: TTaskContinuationOptions): ITaskEx;
begin
  Result := TTaskEx.Run(
    procedure
    var
      task: ITaskEx;
      doContinue: Boolean;
    begin
      task := Self;
      if not IsComplete then
        DoneEvent.WaitFor;
      fExceptObj := GetExceptionObject;
      case continuationOptions of
        NotOnCompleted:  doContinue := GetStatus <> TTaskStatus.Completed;
        NotOnFaulted:    doContinue := GetStatus <> TTaskStatus.Exception;
        NotOnCanceled:   doContinue := GetStatus <> TTaskStatus.Canceled;
        OnlyOnCompleted: doContinue := GetStatus = TTaskStatus.Completed;
        OnlyOnFaulted:   doContinue := GetStatus = TTaskStatus.Exception;
        OnlyOnCanceled:  doContinue := GetStatus = TTaskStatus.Canceled;
      else
        doContinue := False;
      end;
      if doContinue then
        continuationAction(task);
    end);
end;

destructor TTaskEx.Destroy;
begin
  fExceptObj.Free;
  inherited;
end;

function TTaskEx.GetExceptObj: Exception;
begin
  Result := fExceptObj;
end;

class function TTaskEx.Run(const action: TProc): ITaskEx;
var
  task: TTaskEx;
begin
  task := TTaskEx.Create(nil, TNotifyEvent(nil), action, TThreadPool.Default, nil);
  Result := task.Start as ITaskEx;
end;

end.

So what I did is add the ContinueWith method here that takes the delegate that gets executed when the previous task finished with a certain state. I also added properties for the Status and the Exception that might have been raised.

How can this be used?

  TTaskEx.Run(
    procedure
    begin
      Sleep(2000);
      raise EProgrammerNotFound.Create('whoops')
    end)
    .ContinueWith(
    procedure(const t: ITaskEx)
    begin
      TThread.Queue(nil,
        procedure
        begin
          ShowMessage(t.ExceptObj.Message);
        end);
    end, OnlyOnFaulted);

This executes the first task which Sleeps 2 seconds and then raises an exception. This would not only leak memory (!) but also there is no possibility to handle this exception unless you keep a reference to this task and then call Wait somewhere. But this would limit its use very much. Instead we call ContinueWith now passing the error reporting delegate and OnlyOnFaulted in order to execute this only if the previous task had an error.

Easy enough, isn't it?